主頁 > 後端開發 > Dubbo原始碼分析

Dubbo原始碼分析

2021-04-11 10:19:01 後端開發

一,為什么要用Dubbo

1.為什么要用現成的框架呢?

如果我們自己去開發一個網路通信,需要考慮到

  • 底層網路通信協議的處理
  • 序列化和反序列化的處理作業

這些作業本身應該是通用的,應該是一個中間件服務,為整個公司提供遠程通信的服務,而不應該由業務開發人員來自己去實作,所以才有了這樣的 rpc 框架,使得我們呼叫遠程方法時就像呼叫本地方法那么簡單,不需要關心底層的通信邏輯,

2.大規模服務化對于服務治理的要求

當企業開始大規模的服務化以后,遠程通信帶來的弊端就越來越明顯了,

  • 服務鏈路變長了,如何實作對服務鏈路的跟蹤和監控
  • 服務的大規模集群使得服務之間需要依賴第三方注冊中心來解決服務的發現和服務的感知問題
  • 服務通信之間的例外,需要有一種保護機制防止一個節點故障引發大規模的系統故障,所以要有容錯機制
  • 服務大規模集群會是的客戶端需要引入負載均衡機制實作請求分發

dubbo 主要是一個分布式服務治理解決方案,那么什么是服務治理?服務治理主要是針對大規模服務化以后,服務之間的路由、負載均衡、容錯機制、服務降級這些問題的解決方案,而 Dubbo 實作的不僅僅是遠程服務通信,并且還解決了服務路由、負載、降級、容錯等功能,

二,Dubbo 的基本使用

1.dubbo-common

1)service

/**
 * @author yhd
 * @email yinhuidong1@xiaomi.com
 * @description TODO
 * @since 2021/4/2 0:32
 */
public interface LoginService {

    String login(String username,String password);
}

2.dubbo-provider

1)pom

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.apache.dubbo</groupId>
			<artifactId>dubbo</artifactId>
			<version>2.7.8</version>
		</dependency>
		<dependency>
			<groupId>org.slf4j</groupId>
			<artifactId>slf4j-api</artifactId>
		</dependency>
		<dependency>
			<groupId>ch.qos.logback</groupId>
			<artifactId>logback-classic</artifactId>
		</dependency>
		<dependency>
			<groupId>com.yhd</groupId>
			<artifactId>dubbo-common</artifactId>
			<version>0.0.1-SNAPSHOT</version>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>
		<dependency>
			<groupId>org.apache.zookeeper</groupId>
			<artifactId>zookeeper</artifactId>
			<version>3.6.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-framework</artifactId>
			<version>5.1.0</version>
		</dependency>
		<dependency>
			<groupId>org.apache.curator</groupId>
			<artifactId>curator-recipes</artifactId>
			<version>5.1.0</version>
		</dependency>
		<dependency>
			<groupId>com.101tec</groupId>
			<artifactId>zkclient</artifactId>
			<version>0.10</version>
		</dependency>

2)組態檔

server.port=8081
dubbo.application.name=dubbo-provider
dubbo.registry.address=zookeeper://localhost:2181
dubbo.registry.protocal=zookeeper
dubbo.protocol.name=dubbo
dubbo.protocol.port=20880

3)主啟動類

@EnableDubbo
@ComponentScan("com.yhd")
@SpringBootApplication
public class DubboProviderApplication {

	public static void main(String[] args) {
		SpringApplication.run(DubboProviderApplication.class, args);
	}

}

4)service

@DubboService
public class LoginServiceImpl implements LoginService {
    @Override
    public String login(String username, String password) {
        return "login success!";
    }
}

3.dubbo-consumer

1)Controller

@RestController
public class LoginController {

    @DubboReference
    private LoginService loginService;

    @GetMapping("login/{aaa}/{bbb}")
    public String login(@PathVariable("aaa")String aaa,@PathVariable("bbb") String bbb){
        return loginService.login(aaa,bbb);
    }
}

三,dubbo啟動原理

Dubbo 提供了幾種容器讓我們去啟動和發布服務

1.容器型別

Spring Container

自動加載 META-INF/spring 目錄下的所有 Spring 配置,

logback Container

自動裝配 logback 日志

Log4j Container

自動配置 log4j 的配置

Dubbo 提供了一個 Main.main 快速啟動相應的容器,默認情況下,只會啟動 spring 容器

2.原理分析

默認情況下,spring 容器,本質上,就是加在 spring ioc 容器,然后啟動一個 netty 服務實作服務的發布,所以并沒有特別多的黑科技,下面是spring 容器啟動的代碼

public void start() {
    String configPath =ConfigUtils.getProperty( "dubbo.spring.config");
    if (StringUtils.isEmpty(configPath)) {
        configPath =  "classpath*:META- - INF/spring/*.xml";
    }
    context =  new
	ClassPathXmlApplicationContext(configPath.split( "[,\ \\ \ s]+"),false);
    context.refresh();
    context.start();
}

四,Dubbo對注冊中心的支持

Dubbo 能夠支持的注冊中心有:consul、etcd、nacos、sofa、zookeeper、redis、multicast

1.Dubbo 集成 Zookeeper 的實作原理

在這里插入圖片描述

2.dubbo 每次都要連 zookeeper ?

不是每次發起一個請求的時候,都需要訪問注冊中心,是通過快取實作,

其他注冊中心的實作,核心本質是一樣的,都是為了管理服務地址,

3.多注冊中心支持

Dubbo 中可以支持多注冊中心,有的時候,客戶端需要用呼叫的遠程服務不在同一個注冊中心上,那么客戶端就需要配置多個注冊中心來訪問,

五,Dubbo僅僅是一個RPC框架?

Dubbo 的核心功能,提供服務注冊和服務發現以及基于 Dubbo 協議的遠程通信,Dubbo 從另一個方面來看也可以認為是一個服務治理生態,

  • Dubbo 可以支持市面上主流的注冊中心
  • Dubbo 提供了 Container 的支持,默認提供了 3 種 container,
  • Dubbo 對于 RPC 通信協議的支持,不僅僅是原生的 Dubbo 協議,它還圍繞著 rmi、hessian、http、webservice、thrift、rest

有了多協議的支持,使得其他 rpc 框架的應用程式可以快速的切入到 dubbo生態中, 同時,對于多協議的支持,使得不同應用場景的服務,可以選擇合適的協議來發布服務,并不一定要使用 dubbo 提供的長連接方式,

1.Dubbo 監控平臺安裝

Dubbo-Admin

  • 修 改 dubbo-admin-server/src/main/resources/application.properties中的配置資訊
  • mvn clean package 進行構建
  • mvn clean package 進行構建
  • 訪問 localhost:8080

2.Dubbo 的終端操作

Dubbo 里面提供了一種基于終端操作的方法來實作服務治理,

使用 telnet localhost 20880 連接到服務對應的埠,

1)常見命令

ls
ls: 顯示服務串列
ls -l: 顯示服務詳細資訊串列
ls XxxService: 顯示服務的方法串列
ls -l XxxService: 顯示服務的方法詳細資訊串列

ps
ps: 顯示服務埠串列
ps -l: 顯示服務地址串列
ps 20880: 顯示埠上的連接資訊
ps -l 20880: 顯示埠上的連接詳細資訊

cd
cd XxxService: 改變預設服務,當設定了預設服務,凡是需要輸入服務名作
為引數的命令,都可以省略服務引數
cd /: 取消預設服務

pwd
pwd: 顯示當前預設服務

count
count XxxService: 統計 1 次服務任意方法的呼叫情況
count XxxService 10: 統計 10 次服務任意方法的呼叫情況
count XxxService xxxMethod: 統計 1 次服務方法的呼叫情況
count XxxService xxxMethod 10: 統計 10 次服務方法的呼叫情況

六,負載均衡

1.負載均衡的背景

當服務端存在多個節點的集群時,zookeeper 上會維護不同集群節點,對于客戶端而言,他需要一種負載均衡機制來實作目標服務的請求負載,通過負載均衡,可以讓每個服務器節點獲得適合自己處理能力的負載,

Dubbo 里面默認就集成了負載均衡的演算法和實作,默認提供了 4 種負載均衡實作,

2.Dubbo 中負載均衡的應用

1)啟動兩臺一樣的服務

修改組態檔

dubbo.protocol.port=20881

2)代碼

@DubboService(loadbalance = "random")

3.Dubbo負載均衡演算法

1)RandomLoadBalance

權重隨機演算法,根據權重值進行隨機負載,

它的演算法思想很簡單,假設我們有一組服務器 servers = [A, B, C],他們對應的權重為weights = [5, 3, 2],權重總和為 10,現在把這些權重值平鋪在一維坐標值上,[0, 5) 區間屬于服務器 A,[5, 8) 區間屬于服務器 B,[8, 10) 區間屬于服務器 C,接下來通過亂數生成器生成一個范圍在 [0, 10) 之間的亂數,然后計算這個亂數會落到哪個區間上,比如數字 3 會落到服務器 A 對應的區間上,此時回傳服務器 A 即可,權重越大的機器,在坐標軸上對應的區間范圍就越大,因此亂數生成器生成的數字就會有更大的概率落到此區間內,只要亂數生成器產生的亂數分布性很好,在經過多次選擇后,每個服務器被選中的次數比例接近其權重比例,

2)LeastActiveLoadBalance

最少活躍呼叫數演算法,活躍呼叫數越小,表明該服務提供者效率越高,單位時間內可處理更多的請求這個是比較科學的負載均衡演算法,

每個服務提供者對應一個活躍數 active,初始情況下,所有服務提供者活躍數均為 0,每收到一個請求,活躍數加 1,完成請求后則將活躍數減 1,在服務運行一段時間后,性能好的服務提供者處理請求的速度更快,因此活躍數下降的也越快,此時這樣的服務提供者能夠優先獲取到新的服務請求,

3)ConsistentHashLoadBalance

hash 一致性演算法,相同引數的請求總是發到同一提供者,

當某一臺提供者掛時,原本發往該提供者的請求,基于虛擬節點,平攤到其它提供者,不會引起劇烈變動,

4)RoundRobinLoadBalance

加權輪詢演算法

所謂輪詢是指將請求輪流分配給每臺服務器,舉個例子,我們有三臺服務器 A、B、C,我們將第一個請求分配給服務器 A,第二個請求分配給服務器 B,第三個請求分配給服務器 C,第四個請求再次分配給服務器 A,這個程序就叫做輪詢,輪詢是一種無狀態負載均衡演算法,實作簡單,適用于每臺服務器性能相近的場景下,但現實情況下,我們并不能保證每臺服務器性能均相近,如果我們將等量的請求分配給性能較差的服務器,這顯然是不合理的,因此,這個時候我們需要對輪詢程序進行加權,以調控每臺服務器的負載,經過加權后,每臺服務器能夠得到的請求數比例,接近或等于他們的權重比,比如服務器 A、B、C 權重比為 5:2:1,那么在 8 次請求中,服務器 A 將收到其中的 5 次請求,服務器 B 會收到其中的 2 次請求,服務器 C 則收到其中的 1次請求,

5)一致性 hash 演算法原理

七,集群容錯

網路通信會有很多不確定因素,比如網路延遲、網路中斷、服務例外等,會造成當前這次請求出現失敗, 當服務通信出現這個問題時,需要采取一定的措施應對,而 dubbo 中提供了容錯機制來優雅處理這種錯誤,

在集群呼叫失敗時,Dubbo 提供了多種容錯方案,預設為 failover 重試,

@DubboService(loadbalance = "random",cluster = "failsafe")

1.Failover Cluster

失敗自動切換,當出現失敗,重試其它服務器,(預設),

通常用于讀操作,但重試會帶來更長延遲,

可通過 retries=“2” 來設定重試次數(不含第一次),

2.Failfast Cluster

快速失敗,只發起一次呼叫,失敗立即報錯,

通常用于非冪等性的寫操作,比如新增記錄,

3.Failsafe Cluster

失敗安全,出現例外時,直接忽略,

通常用于寫入審計日志等操作,

4.Failback Cluster

失敗自動恢復,后臺記錄失敗請求,定時重發,

通常用于訊息通知操作,

5.Forking Cluster

并行呼叫多個服務器,只要一個成功即回傳,

通常用于實時性要求較高的讀操作,但需要浪費更多服務資源,

可通過 forks=“2” 來設定最大并行數,

6.Broadcast Cluster

廣播呼叫所有提供者,逐個呼叫,任意一臺報錯則報錯,(2.1.0 開始支持)

通常用于通知所有提供者更新快取或日志等本地資源資訊,

在實際應用中 查詢陳述句容錯策略建議使用默認 Failover Cluster ,而增刪改 建議使用Failfast Cluster 或者 使用 Failover Cluster(retries=”0”) 策略 防止出現資料 重復添加等等其它問題!建議在設計介面時候把查詢介面方法單獨做一個介面提供查詢,

八,服務降級

1.降級的概念

當某個非關鍵服務出現錯誤時,可以通過降級功能來臨時屏蔽這個服務,降級可以有幾個層面的分類: 自動降級和人工降級; 按照功能可以分為:讀服務降級和寫服務降級;

  • 對一些非核心服務進行人工降級,在大促之前通過降級開關關閉哪些推薦內容、評價等對主流程沒有影響的功能,
  • 故障降級,比如呼叫的遠程服務掛了,網路故障、或者 RPC 服務回傳例外, 那么可以直接降級,降級的方案比如設定默認值、采用兜底資料(系統推薦的行為廣告掛了,可以提前準備靜態頁面做回傳)等等,
  • 限流降級,在秒殺這種流量比較集中并且流量特別大的情況下,因為突發訪問量特別大可能會導致系統支撐不了,這個時候可以采用限流來限制訪問量,當達到閥值時,后續的請求被降級,比如進入排隊頁面,比如跳轉到錯誤頁(活動太火爆,稍后重試等),

那么,Dubbo 中如何實作服務降級呢?Dubbo 中提供了一個 mock 的配置,可以通過mock 來實作當服務提供方出現網路例外或者掛掉以后,客戶端不拋出例外,而是通過Mock 資料回傳自定義的資料,

2.Dubbo 實作服務降級

dubbo-client 端創建一個 mock 類,當出現服務降級時,會被呼叫

/**
 * @author yhd
 * @email yinhuidong1@xiaomi.com
 * @description 服務降級兜底類
 * @since 2021/4/2 15:28
 */
public class MockSayHelloService implements LoginService {
    @Override
    public String login(String username, String password) {
        return "Sorry,  服務端發生例外,被降級啦!";
    }
}

在消費方的主街上配置:

    @DubboReference(mock = "com.yhd.dubboconsumer.mock.MockSayHelloService",
            timeout = 1000, loadbalance = "random", cluster = "failfast",check = false)
    private LoginService loginService;

3.啟動時檢查

Dubbo 預設會在啟動時檢查依賴的服務是否可用,不可用時會拋出例外,阻止 Spring初始化完成,以便上線時,能及早發現問題,默認 check=“true”,

可以通過 check=“false” 關閉檢查,比如,測驗時,有些服務不關心,或者出現了回圈依賴,必須有一方先啟動,

registry、reference、consumer 都可以配置 check 這個屬性.

    @DubboReference(mock = "com.yhd.dubboconsumer.mock.MockSayHelloService",
            timeout = 1000, loadbalance = "random", cluster = "failfast",check = false)
    private LoginService loginService;

4.多版本支持

當一個介面實作,出現不兼容升級時,可以用版本號過渡,版本號不同的服務相互間不參考,

可以按照以下的步驟進行版本遷移:

  • 在低壓力時間段,先升級一半提供者為新版本
  • 再將所有消費者升級為新版本
  • 然后將剩下的一半提供者升級為新版本

5.主機系結

1)默認的主機系結方式

  • 通過 LocalHost.getLocalHost()獲取本機地址,
  • 如果是 127.*等 loopback(環路地址)地址,則掃描各網卡,獲取網卡 IP,
    • 如果是 springboot,修改配置:dubbo.protocol.host=””
    • 如果注冊地址獲取不正確,可以通過在 dubbo.xml 中加入主機地址的配置,
<dubbo:protocol host="205.182.23.201">

2)預設主機埠

dubbo: 20880
rmi: 1099
http: 80
hessian: 80
webservice: 80
memcached: 11211
redis: 6379

九,Dubbo 新的功能

1.動態配置規則

動態配置是 Dubbo2.7 版本引入的一個新的功能,簡單來說,就是把 dubbo.properties中的屬性進行集中式存盤,存盤在其他的服務器上,

那么如果需要用到集中式存盤,那么還需要一些配置中心的組件來支撐,目前 Dubbo 能支持的配置中心有:apollo、nacos、zookeeper

從另外一個角度來看,我們之前用 zookeeper 實作服務注冊和發現,本質上就是使用 zookeeper 實作了配置中心,這個配置中心只是維護了服務注冊和服務感知的功能,在 2.7 版本中,dubbo 對配置中心做了延展,除了服務注冊之外,還可以把其他的資料存盤在 zookeeper 上,從而更好的進行維護,

1)在 dubboadmin 添加配置

應用名稱可以是 global,或者對應當前服務的應用名,如果是 global 表示全域配置,針對所有應用可見,

配置的內容,實際就是 dubbo.properties 中配置的基本資訊,只是同意存盤在了zookeeper 上,

2)本地的組態檔添加配置中心

application.properties 中添加配置中心的配置項,app-name對應的是上一步創建的配置項中的應用名.

dubbo.config-center.address= zookeeper://192.168.13.106 6 :2181
dubbo.config-center.app- - name= spring-boot-provider
#需要注意的是,存在于配置中心上的配置項,本地仍然需要配置一份,所以下面這些配置一定要加上,否則啟動不了,這樣做的目的是保證可靠性
dubbo.application.name= spring-boot-provider
dubbo.protocol.port= 20880
dubbo.protocol.name= dubbo
dubbo.registry.address= zookeeper://192.168.13.102:2181?backup=192.168.13.103:2181,192.168.13.104:2181

3)配置的優先級

引入配置中心后,配置的優先級就需要關注了,默認情況下,外部配置的優先級最高,也就是意味著配置中心上的配置會覆寫本地的配置,當然我們也可以調整優先級,

dubbo.config-center.highest-priority=false

4)配置中心的原理

默認所有的配置都存盤在/dubbo/config 節點,

namespace,用于不同配置的環境隔離,

config,Dubbo 約定的固定節點,不可更改,所有配置和服務治理規則都存盤在此節點下,

dubbo/application,分別用來隔離全域配置、應用級別配置:dubbo 是默認 group 值,

application 對應應用名,

dubbo.properties,此節點的 node value 存盤具體配置內容,

在這里插入圖片描述

2.元資料中心

Dubbo2.7 的另外一個新的功能,就是增加了元資料的配置,

在 Dubbo2.7 之前,所有的配置資訊,比如服務介面名稱、重試次數、版本號、負載策略、容錯策略等等,所有引數都是基于 url 形式配置在 zookeeper 上的,這種方式會造成一些問題:

  • url 內容過多,導致資料存盤空間增大
  • url 需要涉及到網路傳輸,資料量過大會造成網路傳輸過慢
  • 網路傳輸慢,會造成服務地址感知的延遲變大,影響服務的正常回應

服務提供者這邊的配置引數有 30 多個,有一半是不需要作為注冊中心進行存盤和傳輸地的,而消費者這邊可配置的引數有 25 個以上,只有個別是需要傳遞到注冊中心的,所以,在 Dubbo2.7 中對元資料進行了改造,簡單來說,就是把屬于服務治理的資料發布到注冊中心,其他的配置資料統一發布到元資料中心,這樣一來大大降低了注冊中心的負載,

1)元資料中心配置

元資料中心目前支持 redis 和 zookeeper,官方推薦是采用 redis,畢竟 redis 本身對于非結構化存盤的資料讀寫性能比較高,當然,也可以使用 zookeeper 來實作,將注冊中心地址、元資料中心地址等配置集中管理,可以做到統一環境、減少開發側感知,官網可查詢外部化配置,不過描述過于簡略,

dubbo.metadata-report.address= zookeeper://192.168.13.106:2181
dubbo.registry.simplified= true 
#注冊到注冊中心的 URL 是否采用精簡模式的(與低版本兼容)

十,Dubbo 中的 SPI 機制

dubbo版本2.7.2,

1.Java SPI

SPI 是 JDK 內置的一種服務提供發現機制,目前市面上有很多框架都是用它來做服務的擴展發現,簡單來說,它是一種動態替換發現的機制,舉個簡單的例子,我們想在運行時動態給它添加實作,你只需要添加一個實作,然后把新的實作描述給 JDK 知道就行了,如 JDBC、日志框架都有用到,

在這里插入圖片描述

1)實作SPI需要遵循的標準

  • 需要在 classpath 下創建一個目錄,該目錄命名必須是:META-INF/service
  • 在該目錄下創建一個 properties 檔案,該檔案需要滿足以下幾個條件
    • 檔案名必須是擴展的介面的全路徑名稱
    • 檔案內部描述的是該擴展介面的所有實作類
    • 檔案的編碼格式是 UTF-8
  • 通過 java.util.ServiceLoader 的加載機制來發現

2)SPI的實際應用

JDK 本身提供了資料訪問的 api,在 java.sql 這個包里面,

java.sql.Driver 的原始碼,Driver 并沒有實作,而是提供了一套標準的 api 介面,

在這里插入圖片描述

通過 SPI 機制把 java.sql.Driver 和 mysql 的驅動做了集成,達到了各個資料庫廠商自己去實作資料庫連接,jdk 本身不關心你怎么實作,

門面模式?配接器模式?

3)SPI的缺點

  • JDK 標準的 SPI 會一次性加載實體化擴展點的所有實作,什么意思呢?就是如果你在 META-INF/service 下的檔案里面加了 N個實作類,那么 JDK 啟動的時候都會一次性全部加載,那么如果有的擴展點實作初始化很耗時或者如果有些實作類并沒有用到,那么會很浪費資源
  • 如果擴展點加載失敗,會導致呼叫方報錯,而且這個錯誤很難定位到是這個原因,

2.Dubbo 優化后的 SPI

1)基于 Dubbo SPI 的實作自己的擴展

Dubbo 的 SPI 擴展機制,有兩個規則

  • 需要在 resource 目錄下配置 META-INF/dubbo 或者 META-INF/dubbo/internal 或者 META-INF/services,并基于 SPI 介面去創建一個檔案,
  • 檔案名稱和介面名稱保持一致,檔案內容和 SPI 有差異,內容是 KEY 對應 Value

Dubbo 針對的擴展點非常多,可以針對協議、攔截、集群、路由、負載均衡、序列化、容器… 幾乎里面用到的所有功能,都可以實作自己的擴展,這個是 dubbo 比較強大的一點,

在這里插入圖片描述

2)擴展協議擴展點

  • 創建如下結構,添加 META-INF.dubbo 檔案,類名和 Dubbo 提供的協議擴展點介面保持一致,

在這里插入圖片描述

myProtocol=com.yhd.dubboprovider.diy.MyProtocol
  • 創建 MyProtocol 協議類
    • 可以實作自己的協議,我們為了模擬協議產生了作用,修改一個埠
public class MyProtocol implements Protocol {
    @Override
    public int getDefaultPort() {
        return 8888;
    }

    @Override
    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        return null;
    }

    @Override
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        return null;
    }

    @Override
    public void destroy() {

    }
}
  • 在呼叫處執行如下代碼
Protocol protocol=ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol"); 
System.out.print(protocol.getDefaultPort)
  • 輸出結果,可以看到運行結果,是執行的自定義的協議擴展點,
  • 總結:總的來說,思路和 SPI 是差不多,都是基于約定的路徑下制定組態檔,目的,通過配置的方式輕松實作功能的擴展,

一定有一個地方通過讀取指定路徑下的所有檔案進行 load,然后講對應的結果保存到一個 map 中,key 對應為名稱,value 對應為實作類,那么這個實作,一定就在 ExtensionLoader 中了,

3.Dubbo 的擴展點原理實作

Dubbo SPI和JDK SPI配置的不同,在Dubbo SPI中可以通過鍵值對的方式進行配置,這樣就可以按需加載指定的實作類,

Dubbo SPI的相關邏輯都被封裝到ExtensionLoader類中,通過ExtensionLoader我們可以加載指定的實作類,一個擴展介面就對應一個ExtensionLoader物件,在這里我們把它稱為:擴展點加載器,

1)屬性

public class ExtensionLoader<T> {
    
    //擴展點組態檔的路徑,可以從3個地方加載到擴展點組態檔
    private static final String SERVICES_DIRECTORY = "META-INF/services/";
    private static final String DUBBO_DIRECTORY = "META-INF/dubbo/";
    private static final String DUBBO_INTERNAL_DIRECTORY = DUBBO_DIRECTORY + "internal/";   
    //擴展點加載器的集合
    private static final ConcurrentMap<Class<?>, ExtensionLoader<?>> EXTENSION_LOADERS = new ConcurrentHashMap<Class<?>, ExtensionLoader<?>>();
    //擴展點實作的集合
    private static final ConcurrentMap<Class<?>, Object> EXTENSION_INSTANCES = new ConcurrentHashMap<Class<?>, Object>();
    //擴展點名稱和實作的映射快取
    private final ConcurrentMap<Class<?>, String> cachedNames = new ConcurrentHashMap<Class<?>, String>();
    //拓展點實作類集合快取
    private final Holder<Map<String, Class<?>>> cachedClasses = new Holder<Map<String, Class<?>>>();
    //擴展點名稱和@Activate的映射快取
    private final Map<String, Activate> cachedActivates = new ConcurrentHashMap<String, Activate>();
    //擴展點實作的快取
    private final ConcurrentMap<String, Holder<Object>> cachedInstances = new ConcurrentHashMap<String, Holder<Object>>();
}

ExtensionLoader會把不同的擴展點配置和實作都快取起來,同時,Dubbo在官網上也給了我們提醒:擴展點使用單一實體加載(請確保擴展實作的執行緒安全性),快取在 ExtensionLoader中,下面我們看幾個重點方法,

2)獲取擴展點加載器

我們首先通過ExtensionLoader.getExtensionLoader() 方法獲取一個 ExtensionLoader 實體,它就是擴展點加載器,然后再通過 ExtensionLoader 的 getExtension 方法獲取拓展類物件,這其中,getExtensionLoader 方法用于從快取中獲取與拓展類對應的 ExtensionLoader,若快取未命中,則創建一個新的實體,

public static <T> ExtensionLoader<T> getExtensionLoader(Class<T> type) {
    if (type == null)
        throw new IllegalArgumentException("Extension type == null");
    if (!type.isInterface()) {
        throw new IllegalArgumentException("Extension type(" + type + ") is not interface!");
    }
    if (!withExtensionAnnotation(type)) {
        throw new IllegalArgumentException("Extension type(" + type +
                ") is not extension, because WITHOUT @" + SPI.class.getSimpleName() + " Annotation!");
    }
    ExtensionLoader<T> loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    if (loader == null) {
        EXTENSION_LOADERS.putIfAbsent(type, new ExtensionLoader<T>(type));
        loader = (ExtensionLoader<T>) EXTENSION_LOADERS.get(type);
    }
    return loader;
}

比如你可以通過下面這樣,來獲取Protocol介面的ExtensionLoader實體:

ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);

就可以拿到擴展點加載器的物件實體:

com.alibaba.dubbo.common.extension.ExtensionLoader[com.alibaba.dubbo.rpc.Protocol]

3)獲取擴展類物件

上一步我們已經拿到加載器,然后可以根據加載器實體,通過擴展點的名稱獲取擴展類物件,

public T getExtension(String name) {
    //校驗擴展點名稱的合法性
    if (name == null || name.length() == 0)
        throw new IllegalArgumentException("Extension name == null");
    // 獲取默認的拓展實作類
    if ("true".equals(name)) {
        return getDefaultExtension();
    }
    //用于持有目標物件
    Holder<Object> holder = cachedInstances.get(name);
    if (holder == null) {
        cachedInstances.putIfAbsent(name, new Holder<Object>());
        holder = cachedInstances.get(name);
    }
    Object instance = holder.get();
    if (instance == null) {
        synchronized (holder) {
            instance = holder.get();
            if (instance == null) {
                instance = createExtension(name);
                holder.set(instance);
            }
        }
    }
    return (T) instance;
}

它先嘗試從快取中獲取,未命中則創建擴展物件,那么它的創建程序是怎樣的呢?

private T createExtension(String name) {
    //從組態檔中獲取所有的擴展類,Map資料結構
    //然后根據名稱獲取對應的擴展類
    Class<?> clazz = getExtensionClasses().get(name);
    if (clazz == null) {
        throw findException(name);
    }
    try {
        //通過反射創建實體,然后放入快取
        T instance = (T) EXTENSION_INSTANCES.get(clazz);
        if (instance == null) {
            EXTENSION_INSTANCES.putIfAbsent(clazz, clazz.newInstance());
            instance = (T) EXTENSION_INSTANCES.get(clazz);
        }
        //注入依賴
        injectExtension(instance);
        Set<Class<?>> wrapperClasses = cachedWrapperClasses;
        if (wrapperClasses != null && !wrapperClasses.isEmpty()) {
            // 包裝為Wrapper實體
            for (Class<?> wrapperClass : wrapperClasses) {
                instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
            }
        }
        return instance;
    } catch (Throwable t) {
        throw new IllegalStateException("Extension instance(name: " + name + ", class: " +
                type + ")  could not be instantiated: " + t.getMessage(), t);
    }
}

這里的重點有兩個,依賴注入和Wrapper包裝類,它們是Dubbo中IOC 與 AOP 的具體實作,

①依賴注入

向拓展物件中注入依賴,它會獲取類的所有方法,判斷方法是否以 set 開頭,且方法僅有一個引數,且方法訪問級別為 public,就通過反射設定屬性值,所以說,Dubbo中的IOC僅支持以setter方式注入,

private T injectExtension(T instance) {
    try {
        if (objectFactory != null) {
            for (Method method : instance.getClass().getMethods()) {
                if (method.getName().startsWith("set")
                        && method.getParameterTypes().length == 1
                        && Modifier.isPublic(method.getModifiers())) {
                    Class<?> pt = method.getParameterTypes()[0];
                    try {
                        String property = method.getName().length() > 3 ? method.getName().substring(3, 4).toLowerCase() + method.getName().substring(4) : "";
                        Object object = objectFactory.getExtension(pt, property);
                        if (object != null) {
                            method.invoke(instance, object);
                        }
                    } catch (Exception e) {
                        logger.error("fail to inject via method " + method.getName()
                                + " of interface " + type.getName() + ": " + e.getMessage(), e);
                    }
                }
            }
        }
    } catch (Exception e) {
        logger.error(e.getMessage(), e);
    }
    return instance;
}

②Wrapper

它會將當前 instance 作為引數傳給 Wrapper 的構造方法,并通過反射創建 Wrapper 實體, 然后向 Wrapper 實體中注入依賴,最后將 Wrapper 實體再次賦值給 instance 變數,說起來可能比較繞,我們直接看下它最后生成的物件就明白了,

我們以DubboProtocol為例,它包裝后的物件為:

在這里插入圖片描述

綜上所述,如果我們獲取一個擴展類物件,最后拿到的就是這個Wrapper類的實體,

就像這樣:

ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);
Protocol extension = extensionLoader.getExtension("dubbo");
System.out.println(extension);

輸出為:com.alibaba.dubbo.rpc.protocol.ProtocolListenerWrapper@4cdf35a9

4)獲取所有的擴展類

在我們通過名稱獲取擴展類物件之前,首先需要根據組態檔決議出所有的擴展類,

它是一個擴展點名稱和擴展類的映射表Map<String, Class<?>>

首先,還是從快取中cachedClasses獲取,如果沒有就呼叫loadExtensionClasses從組態檔中加載,組態檔有三個路徑:

  • META-INF/services/
  • META-INF/dubbo/
  • META-INF/dubbo/internal/

先嘗試從快取中獲取,

private Map<String, Class<?>> getExtensionClasses() {
    //從快取中獲取
    Map<String, Class<?>> classes = cachedClasses.get();
    if (classes == null) {
        synchronized (cachedClasses) {
            classes = cachedClasses.get();
            if (classes == null) {
                //加載擴展類
                classes = loadExtensionClasses();
                cachedClasses.set(classes);
            }
        }
    }
    return classes;
}   

如果沒有,就呼叫loadExtensionClasses從組態檔中讀取,

private Map<String, Class<?>> loadExtensionClasses() {
    //獲取 SPI 注解,這里的 type 變數是在呼叫 getExtensionLoader 方法時傳入的
    final SPI defaultAnnotation = type.getAnnotation(SPI.class);
    if (defaultAnnotation != null) {
        String value = defaultAnnotation.value();
        if ((value = value.trim()).length() > 0) {
            String[] names = NAME_SEPARATOR.split(value);
            if (names.length > 1) {
                throw new IllegalStateException("more than 1 default extension 
                    name on extension " + type.getName()+ ": " + Arrays.toString(names));
            }
            //設定默認的擴展名稱,參考getDefaultExtension 方法
            //如果名稱為true,就是呼叫默認擴贊類
            if (names.length == 1) cachedDefaultName = names[0];
        }
    }
    //加載指定路徑的組態檔
    Map<String, Class<?>> extensionClasses = new HashMap<String, Class<?>>();
    loadDirectory(extensionClasses, DUBBO_INTERNAL_DIRECTORY);
    loadDirectory(extensionClasses, DUBBO_DIRECTORY);
    loadDirectory(extensionClasses, SERVICES_DIRECTORY);
    return extensionClasses;
}

以Protocol介面為例,獲取到的實作類集合如下,我們就可以根據名稱加載具體的擴展類物件,

{
    registry=class com.alibaba.dubbo.registry.integration.RegistryProtocol
    injvm=class com.alibaba.dubbo.rpc.protocol.injvm.InjvmProtocol
    thrift=class com.alibaba.dubbo.rpc.protocol.thrift.ThriftProtocol
    mock=class com.alibaba.dubbo.rpc.support.MockProtocol
    dubbo=class com.alibaba.dubbo.rpc.protocol.dubbo.DubboProtocol
    http=class com.alibaba.dubbo.rpc.protocol.http.HttpProtocol
    redis=class com.alibaba.dubbo.rpc.protocol.redis.RedisProtocol
    rmi=class com.alibaba.dubbo.rpc.protocol.rmi.RmiProtocol
}

在這里插入圖片描述

4.自適應擴展機制

在Dubbo中,很多拓展都是通過 SPI 機制進行加載的,比如 Protocol、Cluster、LoadBalance 等,這些擴展并非在框架啟動階段就被加載,而是在擴展方法被呼叫的時候,根據URL物件引數進行加載,

那么,Dubbo就是通過自適應擴展機制來解決這個問題,

自適應拓展機制的實作邏輯是這樣的:

首先 Dubbo 會為拓展介面生成具有代理功能的代碼,然后通過 javassist 或 jdk 編譯這段代碼,得到 Class 類,最后再通過反射創建代理類,在代理類中,就可以通過URL物件的引數來確定到底呼叫哪個實作類,

1)Adaptive注解

在開始之前,我們有必要先看一下與自適應拓展息息相關的一個注解,即 Adaptive 注解,

@Documented
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE, ElementType.METHOD})
public @interface Adaptive {
    String[] value() default {};
}

從上面的代碼中可知,Adaptive 可注解在類或方法上,

  • 標注在類上
    Dubbo 不會為該類生成代理類,
  • 標注在方法上
    Dubbo 則會為該方法生成代理邏輯,表示當前方法需要根據 引數URL 呼叫對應的擴展點實作,

2)獲取自適應拓展類

getAdaptiveExtension 方法是獲取自適應拓展的入口方法,

public T getAdaptiveExtension() {
    // 從快取中獲取自適應拓展
    Object instance = cachedAdaptiveInstance.get();
    if (instance == null) {
        if (createAdaptiveInstanceError == null) {
            synchronized (cachedAdaptiveInstance) {
                instance = cachedAdaptiveInstance.get();
                //未命中快取,則創建自適應拓展,然后放入快取
                if (instance == null) {
                    try {
                        instance = createAdaptiveExtension();
                        cachedAdaptiveInstance.set(instance);
                    } catch (Throwable t) {
                        createAdaptiveInstanceError = t;
                        throw new IllegalStateException("fail to create 
                                                  adaptive instance: " + t.toString(), t);
                    }
                }
            }
        }
    }
    return (T) instance;
}

getAdaptiveExtension方法首先會檢查快取,快取未命中,則呼叫 createAdaptiveExtension方法創建自適應拓展,

private T createAdaptiveExtension() {
    try {
        return injectExtension((T) getAdaptiveExtensionClass().newInstance());
    } catch (Exception e) {
        throw new IllegalStateException("
            Can not create adaptive extension " + type + ", cause: " + e.getMessage(), e);
    }
}

這里的代碼較少,呼叫 getAdaptiveExtensionClass方法獲取自適應拓展 Class 物件,然后通過反射實體化,最后呼叫injectExtension方法向拓展實體中注入依賴,

獲取自適應擴展類程序如下:

private Class<?> getAdaptiveExtensionClass() {
    //獲取當前介面的所有實作類
    //如果某個實作類標注了@Adaptive,此時cachedAdaptiveClass不為空
    getExtensionClasses();
    if (cachedAdaptiveClass != null) {
        return cachedAdaptiveClass;
    }
    //以上條件不成立,就創建自適應拓展類
    return cachedAdaptiveClass = createAdaptiveExtensionClass();
}

在上面方法中,它會先獲取當前介面的所有實作類,如果某個實作類標注了@Adaptive,那么該類就被賦值給cachedAdaptiveClass變數并回傳,如果沒有,就呼叫createAdaptiveExtensionClass創建自適應拓展類,

private Class<?> createAdaptiveExtensionClass() {
    //構建自適應拓展代碼
    String code = createAdaptiveExtensionClassCode();
    ClassLoader classLoader = findClassLoader();
    // 獲取編譯器實作類 這個Dubbo默認是采用javassist 
    Compiler compiler =ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
    //編譯代碼,回傳類實體的物件
    return compiler.compile(code, classLoader);
}

在生成自適應擴展類之前,Dubbo會檢查介面方法是否包含@Adaptive,如果方法上都沒有此注解,就要拋出例外,

if (!hasAdaptiveAnnotation){
    throw new IllegalStateException(
        "No adaptive method on extension " + type.getName() + ", 
          refuse to create the adaptive class!");
}

我們還是以Protocol介面為例,它的export()refer()方法,都標注為@AdaptivedestroygetDefaultPort未標注 @Adaptive注解,Dubbo 不會為沒有標注 Adaptive 注解的方法生成代理邏輯,對于該種型別的方法,僅會生成一句拋出例外的代碼,

package com.alibaba.dubbo.rpc;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.Adaptive;
import com.alibaba.dubbo.common.extension.SPI;

@SPI("dubbo")
public interface Protocol {
    int getDefaultPort();
    @Adaptive
    <T> Exporter<T> export(Invoker<T> invoker) throws RpcException;
    @Adaptive
    <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException;
    void destroy();
}

所以說當我們呼叫這兩個方法的時候,會先拿到URL物件中的協議名稱,再根據名稱找到具體的擴展點實作類,然后去呼叫,下面是生成自適應擴展類實體的源代碼:

package com.viewscenes.netsupervisor.adaptive;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.extension.ExtensionLoader;
import com.alibaba.dubbo.rpc.Exporter;
import com.alibaba.dubbo.rpc.Invoker;
import com.alibaba.dubbo.rpc.Protocol;
import com.alibaba.dubbo.rpc.RpcException;

public class Protocol$Adaptive implements Protocol {
    public void destroy() {
        throw new UnsupportedOperationException(
                "method public abstract void Protocol.destroy() of interface Protocol is not adaptive method!");
    }
    public int getDefaultPort() {
        throw new UnsupportedOperationException(
                "method public abstract int Protocol.getDefaultPort() of interface Protocol is not adaptive method!");
    }
    public Exporter export(Invoker invoker)throws RpcException {
        if (invoker == null) {
            throw new IllegalArgumentException("Invoker argument == null");
        }
        if (invoker.getUrl() == null) {
            throw new IllegalArgumentException("Invoker argument getUrl() == null");
        }
            
        URL url = invoker.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException("Fail to get extension(Protocol) name from url("
                    + url.toString() + ") use keys([protocol])");
        }
            
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.export(invoker);
    }
    public Invoker refer(Class clazz,URL ur)throws RpcException {
        if (ur == null) {
            throw new IllegalArgumentException("url == null");
        }
        URL url = ur;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null) {
            throw new IllegalStateException("Fail to get extension(Protocol) name from url("+ url.toString() + ") use keys([protocol])");
        }
        Protocol extension = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(extName);
        return extension.refer(clazz, url);
    }
}

綜上所述,當我們獲取某個介面的自適應擴展類,實際就是一個Adaptive類實體,

ExtensionLoader<Protocol> extensionLoader = ExtensionLoader.getExtensionLoader(Protocol.class);            
Protocol adaptiveExtension = extensionLoader.getAdaptiveExtension();
System.out.println(adaptiveExtension);

輸出為:

com.alibaba.dubbo.rpc.Protocol$Adaptive@47f6473

在這里插入圖片描述

5.自動激活擴展點機制

自動激活擴展點,有點類似springboot 用到的 conditional,根據條件進行自動激活,但是這里設計的初衷是,對于一個類會加載多個擴展點的實作,這個時候可以通過自動激活擴展點進行動態加載, 從而簡化配置我們的配置,

@Activate 提供了一些配置來允許我們配置加載條件,比如 group 過濾,比如 key 過濾,

我們可以看看 org.apache.dubbo.Filter 這個類,它有非常多的實作,比如說 CacheFilter,這個快取過濾器,配置資訊如下:

group 表示客戶端和和服務端都會加載,value 表示 url 中有 cache_key 的時候

@Activate(group = {CONSUMER, PROVIDER}, value = CACHE_KEY)
public class CacheFilter implements Filter {

通過下面這段代碼,演示關于 Filter 的自動激活擴展點的效果,沒有添加“注釋代碼”時,list 的結果是 10,添加之后 list激活擴展點的效果,沒有添加“注釋代碼”時,list 的結果是 10,添加之后 list,會自動把 cacheFilter 加載進來,

	public static void main(String[] args) {
		SpringApplication.run(DubboProviderApplication.class, args);


		/*Protocol protocol = ExtensionLoader.getExtensionLoader(Protocol.class).getExtension("myProtocol");
		System.out.println(protocol.getDefaultPort());*/

		/*Compiler compiler = ExtensionLoader.getExtensionLoader(Compiler.class).getAdaptiveExtension();
		//org.apache.dubbo.common.compiler.support.AdaptiveCompiler
		System.out.println("compiler.getClass() = " + compiler.getClass());*/

		ExtensionLoader<Filter> extensionLoader = ExtensionLoader.getExtensionLoader(Filter.class);
		URL url = new URL("", "", 0);
		List<Filter> filters = extensionLoader.getActivateExtension(//url.addParameter("cache","cache"), "cache");
		System.out.println("filters.size() = " + filters.size());
	}

這個方法的底層邏輯其實就是先獲取到所有對應的激活擴展類,在拿到URL,根據 @Activate 獲取到對應的擴展類組合在一起回傳,

十一,Dubbo原理-框架設計

config 配置層:對外配置介面,以 ServiceConfig, ReferenceConfig 為中心,可以直接初始化配置類,也可以通過 spring 決議配置生成配置類

proxy 服務代理層:服務介面透明代理,生成服務的客戶端 Stub 和服務器端 Skeleton, 以 ServiceProxy 為中心,擴展介面為 ProxyFactory

registry 注冊中心層:封裝服務地址的注冊與發現,以服務 URL 為中心,擴展介面為 RegistryFactory, Registry, RegistryService

cluster 路由層:封裝多個提供者的路由及負載均衡,并橋接注冊中心,以 Invoker 為中心,擴展介面為 Cluster, Directory, Router, LoadBalance

monitor 監控層:RPC 呼叫次數和呼叫時間監控,以 Statistics 為中心,擴展介面為 MonitorFactory, Monitor, MonitorService

protocol 遠程呼叫層:封裝 RPC 呼叫,以 Invocation, Result 為中心,擴展介面為 Protocol, Invoker, Exporter

exchange 資訊交換層:封裝請求回應模式,同步轉異步,以 Request, Response 為中心,擴展介面為 Exchanger, ExchangeChannel, ExchangeClient, ExchangeServer

transport 網路傳輸層:抽象 mina 和 netty 為統一介面,以 Message 為中心,擴展介面為 Channel, Transporter, Client, Server, Codec

serialize 資料序列化層:可復用的一些工具,擴展介面為 Serialization, ObjectInput, ObjectOutput, ThreadPool

十二,服務暴露

分析:如果需要完成服務發布預注冊,需要實作哪些事情?

  • 決議組態檔或注解
  • 服務注冊
  • 啟動netty服務實作遠程監聽

1.dubbo對于spring的擴展

1)spring的標簽擴展

在 spring 中定義了兩個介面

  • NamespaceHandler: 注冊一堆 BeanDefinitionParser,利用他們來進行決議
  • BeanDefinitionParser:用于決議每個 element 的內容

Spring 默認會加載 jar 包下的 META-INF/spring.handlers 檔案尋找對應的 NamespaceHandler, Dubbo-config 模塊下的 dubbo-config-spring就含有這個檔案,

2)dubbo的接入實作

Dubbo 中 spring 擴展就是使用 spring 的自定義型別,所以同樣也有 NamespaceHandler、BeanDefinitionParser,而NamespaceHandler 是 DubboNamespaceHandler,

public class DubboNamespaceHandler extends NamespaceHandlerSupport implements ConfigurableSourceBeanMetadataElement {

    static {
        Version.checkDuplicate(DubboNamespaceHandler.class);
    }

    @Override
    public void init() {
        registerBeanDefinitionParser("application", new DubboBeanDefinitionParser(ApplicationConfig.class, true));
        registerBeanDefinitionParser("module", new DubboBeanDefinitionParser(ModuleConfig.class, true));
        registerBeanDefinitionParser("registry", new DubboBeanDefinitionParser(RegistryConfig.class, true));
        registerBeanDefinitionParser("config-center", new DubboBeanDefinitionParser(ConfigCenterBean.class, true));
        registerBeanDefinitionParser("metadata-report", new DubboBeanDefinitionParser(MetadataReportConfig.class, true));
        registerBeanDefinitionParser("monitor", new DubboBeanDefinitionParser(MonitorConfig.class, true));
        registerBeanDefinitionParser("metrics", new DubboBeanDefinitionParser(MetricsConfig.class, true));
        registerBeanDefinitionParser("ssl", new DubboBeanDefinitionParser(SslConfig.class, true));
        registerBeanDefinitionParser("provider", new DubboBeanDefinitionParser(ProviderConfig.class, true));
        registerBeanDefinitionParser("consumer", new DubboBeanDefinitionParser(ConsumerConfig.class, true));
        registerBeanDefinitionParser("protocol", new DubboBeanDefinitionParser(ProtocolConfig.class, true));
        registerBeanDefinitionParser("service", new DubboBeanDefinitionParser(ServiceBean.class, true));
        registerBeanDefinitionParser("reference", new DubboBeanDefinitionParser(ReferenceBean.class, false));
        registerBeanDefinitionParser("annotation", new AnnotationBeanDefinitionParser());
    }
}

BeanDefinitionParser 全部都使用了 DubboBeanDefinitionParser,如果我們想看 dubbo:service 的配置,就直接看 DubboBeanDefinitionParser(ServiceBean.class,true)

這個里面主要做了一件事,把不同的配置分別轉化成 spring 容器中的 bean 物件

applicationApplicationConfig
registryRegistryConfig
monitorMonitorConfig
providerProviderConfig
consumerConsumerConfig

涉及到服務發布和服務呼叫的兩個配置的決議,用的是 ServiceBean 和 referenceBean,并不是 config 結尾的,這兩個類稍微特殊些,當然他同時也繼承了 ServiceConfig 和 ReferenceConfig,

3)DubboBeanDefinitionParser

這里面是實作具體組態檔決議的入口,它重寫了 parse 方法,對 spring 的配置進行決議,我們關注一下 ServiceBean 的決議.實際就是決議 dubbo:service 這個標簽中對應的屬性,

else if (ServiceBean.class.equals(beanClass)) {
            String className = resolveAttribute(element, "class", parserContext);
            if (StringUtils.isNotEmpty(className)) {
                RootBeanDefinition classDefinition = new RootBeanDefinition();
                classDefinition.setBeanClass(ReflectUtils.forName(className));
                classDefinition.setLazyInit(false);
                parseProperties(element.getChildNodes(), classDefinition, parserContext);
                beanDefinition.getPropertyValues().addPropertyValue("ref", new BeanDefinitionHolder(classDefinition, id + "Impl"));
            }
        }

4)ServiceBean的實作

ServiceBean 這個類,分別實作了 InitializingBean, DisposableBean, ApplicationContextAware, ApplicationListener,BeanNameAware, ApplicationEventPublisherAware

①InitializingBean

介面為 bean 提供了初始化方法的方式,它只包括 afterPropertiesSet 方法,凡是繼承該介面的類,在初始化 bean 的時候會執行該方法,被重寫的方法為 afterPropertiesSet,

②DisposableBean

被重寫的方法為 destroy ,bean 被銷毀的時候,spring 容器會自動執行 destory 方法,比如釋放資源,

③ApplicationContextAware

實作了這個介面的 bean,當 spring 容器初始化的時候,會自動的將 ApplicationContext 注入進來,

④ApplicationListener

ApplicationEvent 事件監聽,spring 容器啟動后會發一個事件通知,被重寫的方法為: onApplicationEvent ,onApplicationEvent方法傳入的物件是 ContextRefreshedEvent,這個物件是當 Spring 的背景關系被重繪或者加載完畢的時候觸發的,因此服務就是在Spring 的背景關系重繪后進行匯出操作的,

⑤BeanNameAware

獲得自身初始化時,本身的 bean 的 id 屬性,被重寫的方法為 setBeanName,

⑥ApplicationEventPublisherAware

這個是一個異步事件發送器,被重寫的方法為 setApplicationEventPublisher ,簡單來說,在 spring 里面提供了類似于訊息佇列的異步事件解耦功能,(典型的觀察者模式的應用),

⑦spring 事件發送監聽由 3 個部分組成

  • ApplicationEvent:表示事件本身,自定義事件需要繼承該類
  • ApplicationEventPublisherAware:事件發送器,需要實作該介面
  • ApplicationListener:事件監聽器介面

5)總結-dubbo啟動決議,加載組態檔

在dubbo的META-INF目錄下有一個 spring.handlers 組態檔,容器啟動就會加載這個組態檔,

這個組態檔里面注冊了一個 Bean 叫做 DubboNamespaceHandler ,這個 bean 就是dubbo 組態檔處理器,

在這個 bean里面有一個init(),這個方法里面會加載很多的 DubboBeanDefinitionParser ,DubboBeanDefinitionParser就是組態檔決議器,(除了 service 和 reference 對應的類叫 ServiceBean 和 ReferenceBean ,其他對應標簽對應的類都是xxxConfig),

DubboBeanDefinitionParser 的parse() 就是對標簽的決議,他會判斷這個標簽所屬的型別,然后根據標簽的配置進行屬性填充,最終將所有的bean注冊到配置中心,

2.ServiceBean 中服務暴露程序

在 ServiceBean 中,我們暫且只需要關注兩個方法,分別是:

  • 在初始化 bean 的時候會執行該方法 afterPropertiesSet
  • spring 容器啟動后會發一個事件通知 onApplicationEvent

1)afterPropertiesSet

這個方法里面,就是把 dubbo 中配置的 application 、 registry 、 service 、 protocol 等資訊,加載到對應的 config物體中,便于后續的使用,

    @Override
    public void afterPropertiesSet() throws Exception {
        if (StringUtils.isEmpty(getPath())) {
            if (StringUtils.isNotEmpty(getInterface())) {
                setPath(getInterface());
            }
        }
    }

2)onApplicationEvent

spring 容器啟動之后,會收到一個這樣的事件通知,這里面做了兩個事情

  • 判斷服務是否已經發布過
  • 如果沒有發布,則呼叫呼叫 export 進行服務發布的流程 入口
public void onApplicationEvent(ContextRefreshedEvent event) {
    if (!isExported() && !isUnexported()) {
        if (logger.isInfoEnabled()) {
        	logger.info("The service ready on spring started. service: " + getInterface());
        }
        export();
    }
}

3)export

serviceBean 中,重寫了 export 方法,實作了 一個事件的發布,并且呼叫了 super.export() ,也就是會呼叫父類的 export 方法,

    @Override
    public void exported() {
        super.exported();
        // Publish ServiceBeanExportedEvent
        publishExportEvent();
    }

3.ServiceConfig配置類

所有的配置它都放在了一個 AbstractServiceConfig 的抽象類,自己實作了很多對于服務發布之前要做的操作邏輯,

1)export

    public synchronized void export() {
        // 當前的服務是否需要發布 ,  通過配置實作: @Service(export = false)
        if (!shouldExport()) {
            return;
        }
		//2.7.5版本新增,多了一個Dubbo的引導類
        if (bootstrap == null) {
            //獲取引導類實體
            bootstrap = DubboBootstrap.getInstance();
            //呼叫引導類的初始化方法
            bootstrap.initialize();
        }
		//檢查并更新配置
        checkAndUpdateSubConfigs();

        //初始化元資料中心 , 2.75 版本新增 ,用來存盤配置資訊,減輕配置中心壓力
        serviceMetadata.setVersion(getVersion());
        serviceMetadata.setGroup(getGroup());
        serviceMetadata.setDefaultGroup(getGroup());
        serviceMetadata.setServiceType(getInterfaceClass());
        serviceMetadata.setServiceInterfaceName(getInterface());
        serviceMetadata.setTarget(getRef());
		// 檢查是否需要延時發布,通過配置 @Service(delay = 1000) 實作,單位毫秒
        if (shouldDelay()) {
            // 這里的延時是通過定時器來實作
            DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
        } else {
            // 如果沒有配置 delay ,則直接呼叫 export 進行發布
            doExport();
        }
		//發布一個事件  TODO
        exported();
    }

2)doExport

這里仍然還是在實作發布前的各種判斷

    protected synchronized void doExport() {
        if (unexported) {
            throw new IllegalStateException("The service " + interfaceClass.getName() + " has already unexported!");
        }
        // 服務是否已經發布過了
        if (exported) {
            return;
        }
        exported = true;// 設定發布狀態

        if (StringUtils.isEmpty(path)) {
            path = interfaceName;//path 表示服務路徑,默認使用 interfaceName
        }
        doExportUrls();
    }

3)doExportUrls

  • 加載所有配置的注冊中心地址
  • 遍歷所有配置的協議,protocols
  • 針對每種協議發布一個對應協議的服務
    private void doExportUrls() {
        ServiceRepository repository = ApplicationModel.getServiceRepository();
        ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
        repository.registerProvider(
                getUniqueServiceName(),
                ref,
                serviceDescriptor,
                this,
                serviceMetadata
        );
		// 加載所有配置的注冊中心的地址,組裝成一個 URL集合
        List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
		
        for (ProtocolConfig protocolConfig : protocols) {
            //group 跟 version 組成一個 pathKey(serviceName)
            String pathKey = URL.buildKey(getContextPath(protocolConfig)
                    .map(p -> p + "/" + path)
                    .orElse(path), group, version);
            // 服務注冊   TODO TODO TODO 重要的事情標記三遍
            repository.registerService(pathKey, interfaceClass);
            serviceMetadata.setServiceKey(pathKey);
            //發布指定協議的服務
            doExportUrlsFor1Protocol(protocolConfig, registryURLs);
        }
    }

4) doExportUrlsFor1Protocol

發布指定協議的服務,以 Dubbo 服務為例,

// export service  發布服務
//獲取當前要發布服務的IP和埠
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
//組裝URL
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);

// You can customize Configurator to append extra parameters
//通過 ConfiguratorFactory 去實作動態改變配置的功能 ,動態配置 TODO
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
    .hasExtension(url.getProtocol())) {
    url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
        .getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}

String scope = url.getParameter(SCOPE_KEY);
// don't export when none is configured
//如果 scope!="none"則發布服務,默認 scope 為 null,
//如果 scope 不為 none,判斷是否為 local 或 remote,
//從而發布 Local 服務或 Remote 服務,默認兩個都會發布
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {

    // export to local if the config is not remote (export to remote only when config is remote)
    if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
        //發布本地服務  injvm
        //服務只是 injvm 的服務,提供一種消費者和提供者都在一個 jvm 內的呼叫方式,使用了 Injvm 協議,是一個偽協議,它不開啟埠,不發起遠程呼叫,只在 JVM 內直接關聯,(通過集合的方式保存了發布的服務資訊),但執行 Dubbo 的 Filter 鏈,簡單來說,就是你本地的 dubbo 服務呼叫,都依托于 dubbo 的標準來進行,這樣可以享受到 dubbo 的一些配置服務,
        exportLocal(url);
    }
    // export to remote if the config is not local (export to local only when config is local)
    if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
        if (CollectionUtils.isNotEmpty(registryURLs)) {
            for (URL registryURL : registryURLs) {
                //if protocol is only injvm ,not register
                if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
                    continue;
                }
                url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
                URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
                if (monitorUrl != null) {
                    url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
                }
                if (logger.isInfoEnabled()) {
                    if (url.getParameter(REGISTER_KEY, true)) {
                        logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
                    } else {
                        logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
                    }
                }

                // For providers, this is used to enable custom proxy to generate invoker
                String proxy = url.getParameter(PROXY_KEY);
                if (StringUtils.isNotEmpty(proxy)) {
                    registryURL = registryURL.addParameter(PROXY_KEY, proxy);
                }
			//Invoker 是一個代理類,它是 Dubbo 的核心模型,其它模型都向它靠擾,或轉換成它,它代表一個可執行體,可向它發起invoke 呼叫,它有可能是一個本地的實作,也可能是一個遠程的實作,也可能一個集群實作,
                Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
                //因為 2.7 引入了元資料,所以這里對 invoker 做了委托,把 invoker 交給DelegateProviderMetaDataInvoker 來處理,
                DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
			//發布代理
                Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
                //添加到 exporters 集合
                exporters.add(exporter);
            }
        } else {
            if (logger.isInfoEnabled()) {
                logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
            }
            Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
            DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);

            Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
            exporters.add(exporter);
        }

5)PROTOCOL.export

protocol.export,這個 protocol 是什么呢?找到定義處發現它是一個自適應擴展點,打開 Protocol 這個擴展點,又可以看到它是一個在方法層面上的自適應擴展,意味著它實作了對于 export 這個方法的適配,也就意味著這個 Protocol 是一個動態代理類,Protocol$Adaptive

這個動態代理類,會根據 url 中配置的 protocol name 來實作對應協議的適配,

private static final Protocol PROTOCOL = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension();

6)Protocol$Adaptive

在當前的場景中,protocol 會是呼叫誰呢?目前發布的 invoker(URL),實際上是一個 registry://協議,所以Protocol$Adaptive,會通過 getExtension(extName)得到一個 RegistryProtocol,

public class Protocol$Adaptive implements org.apache.dubbo.rpc.Protocol {
    public void destroy() {
        throw new UnsupportedOperationException("The method public abstract void org.apache.dubbo.rpc.Protocol.destroy()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
    }

    public int getDefaultPort() {
        throw new UnsupportedOperationException("The method public abstract int org.apache.dubbo.rpc.Protocol.getDefaultPort()of interface org.apache.dubbo.rpc.Protocol is not adaptive method !");
    }

    public org.apache.dubbo.rpc.Exporter export(org.apache.dubbo.rpc.Invoker arg0) throws org.apache.dubbo.rpc.RpcException {
        if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument == null");
        if (arg0.getUrl() == null)
            throw new IllegalArgumentException("org.apache.dubbo.rpc.Invoker argument getUrl() == null");
        org.apache.dubbo.common.URL url = arg0.getUrl();
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
        //獲取擴展類
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        //擴展類執行服務暴露方法
        return extension.export(arg0);
    }

    public org.apache.dubbo.rpc.Invoker refer(java.lang.Class arg0, org.apache.dubbo.common.URL arg1) throws org.apache.dubbo.rpc.RpcException {
        if (arg1 == null) throw new IllegalArgumentException("url == null");
        org.apache.dubbo.common.URL url = arg1;
        String extName = (url.getProtocol() == null ? "dubbo" : url.getProtocol());
        if (extName == null)
            throw new IllegalStateException("Failed to get extension (org.apache.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys ([protocol])");
        org.apache.dubbo.rpc.Protocol extension = (org.apache.dubbo.rpc.Protocol) ExtensionLoader.getExtensionLoader(org.apache.dubbo.rpc.Protocol.class).getExtension(extName);
        return extension.refer(arg0, arg1);
    }
}

4.RegistryProtocol.export

很明顯,這個 RegistryProtocol 是用來實作服務注冊的,這里面會有很多處理邏輯,

  • 實作對應協議的服務發布
  • 實作服務注冊
  • 訂閱服務重寫
    @Override
    public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
        // 這里獲得的是 zookeeper 注冊中心的 url: zookeeper://ip:port
        URL registryUrl = getRegistryUrl(originInvoker);
        // url to export locally
        // 這里是獲得服務提供者的 url, dubbo://ip:port...
        URL providerUrl = getProviderUrl(originInvoker);

        // 訂閱 override 資料,在 admin 控制臺可以針對服務進行治理,比如修改權重,修改路由機制等,當注冊中心有此服務的覆寫配置注冊進來時,推送訊息給提供者,重新暴露服務
        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);

        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
        //========================================================================//
        // 這里就交給了具體的協議去暴露服務,本質上這里就是啟動一個netty
        final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);

        //  根據 invoker 中的 url 獲取 Registry 實體 : zookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        // 獲取要注冊到注冊中心的 URL: dubbo://ip:port
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        // 是否配置了注冊中心,如果是, 則需要注冊
        if (register) {
            register(registryUrl, registeredProviderUrl);
        }

        // 設定注冊中心的訂閱
        registerStatedUrl(registryUrl, registeredProviderUrl, register);


        exporter.setRegisterUrl(registeredProviderUrl);
        exporter.setSubscribeUrl(overrideSubscribeUrl);

        // Deprecated! Subscribe to override rules in 2.6.x or before.
        registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

        notifyExport(exporter);
        // 保證每次 export 都回傳一個新的 exporter 實體
        return new DestroyableExporter<>(exporter);
    }

1)doLocalExport

先通過 doLocalExport 來暴露一個服務,本質上應該是啟動一個通信服務,主要的步驟是將本地 ip 和 20880 埠打開,進行監聽originInvoker: 應該是 registry://ip:port/com.alibaba.dubbo.registry.RegistryService

key: 從 originInvoker 中獲得發布協議的 url: dubbo://ip:port/...

bounds: 一個 prviderUrl 服務 export 之后,快取到 bounds 中,所以一個 providerUrl 只會對應一個 exporter

//computeIfAbsent 就相當于, java8 的語法
if(bounds.get(key)==null){
	bounds.put(key,s->{})
}
    private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
        String key = getCacheKey(originInvoker);

        return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
            // 對原有的 invoker, 委托給了 InvokerDelegate
            Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
            // 將 invoker 轉換為 exporter 并啟動 netty 服務 ----》DubboProtocol
            return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
        });
    }

InvokerDelegete: 是 RegistryProtocol 的一個靜態內部類,該類是一個 originInvoker 的委托類,該類存盤了 originInvoker,其父類 InvokerWrapper 還會存盤 providerUrl,InvokerWrapper 會呼叫 originInvoker 的 invoke 方法,也會銷毀 invoker,可以管理 invoker 的生命周期,

5.DubboProtocol.export

基于動態代理的適配,很自然的就過渡到了 DubboProtocol 這個協議類中,但是實際上是 DubboProtocol 嗎?

這里并不是獲得一個單純的 DubboProtocol 擴展點,而是會通過 Wrapper 對 Protocol 進行裝飾,裝飾器分別為:

QosProtocolWrapper/ProtocolListenerWrapper/ProtocolFilterWrapper/DubboProtocol

為什么會這樣?回頭看SPI機制,

1)Wrapper包裝

在 ExtensionLoader.loadClass 這個方法中,有一段這樣的判斷,如果當前這個類是一個 wrapper 包裝類,也就是這個 wrapper中有構造方法,引數是當前被加載的擴展點的型別,則把這個 wrapper 類加入到 cacheWrapperClass 快取中,

    private void loadClass(Map<String, Class<?>> extensionClasses, java.net.URL resourceURL, Class<?> clazz, String name,
                           boolean overridden) throws NoSuchMethodException {
        if (!type.isAssignableFrom(clazz)) {
            throw new IllegalStateException("Error occurred when loading extension class (interface: " +
                    type + ", class line: " + clazz.getName() + "), class "
                    + clazz.getName() + " is not subtype of interface.");
        }
        if (clazz.isAnnotationPresent(Adaptive.class)) {
            cacheAdaptiveClass(clazz, overridden);
        } else if (isWrapperClass(clazz)) {
            //如果當前這個類是一個 wrapper 包裝類,也就是這個 wrapper中有構造方法   引數是當前被加載的擴展點的型別
            //則把這個 wrapper 類加入到 cacheWrapperClass 快取中,
            cacheWrapperClass(clazz);
        } else {
            clazz.getConstructor();
            if (StringUtils.isEmpty(name)) {
                name = findAnnotationName(clazz);
                if (name.length() == 0) {
                    throw new IllegalStateException("No such extension name for the class " + clazz.getName() + " in the config " + resourceURL);
                }
            }

            String[] names = NAME_SEPARATOR.split(name);
            if (ArrayUtils.isNotEmpty(names)) {
                cacheActivateClass(clazz, names[0]);
                for (String n : names) {
                    cacheName(clazz, n);
                    saveInExtensionClass(extensionClasses, clazz, n, overridden);
                }
            }
        }
    }
    private boolean isWrapperClass(Class<?> clazz) {
        try {
            clazz.getConstructor(type);
            return true;
        } catch (NoSuchMethodException e) {
            return false;
        }
    }

我們可以在 dubbo 的組態檔中找到三個 Wrapper

QosprotocolWrapper:如果當前配置了注冊中心,則會啟動一個 Qos ,與在線運維相關,

ProtocolFilterWrapper:對 invoker 進行 filter 的包裝,實作請求的過濾,

ProtocolListenerWrapper:用于服務 export 時候插入監聽機制,暫未實作

接著,在 getExtension->createExtension 方法中,會對 cacheWrapperClass 集合進行判斷,如果集合不為空,則進行包裝

            if (wrap) {

                List<Class<?>> wrapperClassesList = new ArrayList<>();
                if (cachedWrapperClasses != null) {
                    wrapperClassesList.addAll(cachedWrapperClasses);
                    wrapperClassesList.sort(WrapperComparator.COMPARATOR);
                    Collections.reverse(wrapperClassesList);
                }

                if (CollectionUtils.isNotEmpty(wrapperClassesList)) {
                    for (Class<?> wrapperClass : wrapperClassesList) {
                        Wrapper wrapper = wrapperClass.getAnnotation(Wrapper.class);
                        if (wrapper == null
                                || (ArrayUtils.contains(wrapper.matches(), name) && !ArrayUtils.contains(wrapper.mismatches(), name))) {
                            instance = injectExtension((T) wrapperClass.getConstructor(type).newInstance(instance));
                        }
                    }
                }
            }

2)ProtocolFilterWrapper

這個是一個過濾器的包裝,使用責任鏈模式,對 invoker 進行了包裝,

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        if (UrlUtils.isRegistry(invoker.getUrl())) {
            return protocol.export(invoker);
        }
        // 構建責任鏈,基于激活擴展點
        return protocol.export(buildInvokerChain(invoker, SERVICE_FILTER_KEY, CommonConstants.PROVIDER));
    }
    private static <T> Invoker<T> buildInvokerChain(final Invoker<T> invoker, String key, String group) {
        Invoker<T> last = invoker;
        List<Filter> filters = ExtensionLoader.getExtensionLoader(Filter.class).getActivateExtension(invoker.getUrl(), key, group);
         //下面代碼省略
    }

我們看如下檔案:/dubbo-rpc-api/src/main/resources/META-INF/dubbo/internal/com.alibaba.dubbo.rpc.Filter

默認提供了非常多的過濾器, 然后基于條件激活擴展點,來對 invoker 進行包裝,從而在實作遠程呼叫的時候,會經過這些filter 進行過濾,

3)DubboProtocol.export

public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();

        // 獲取服務標識,理解成服務坐標也行,由服務組名,服務名,服務版本號以及埠組成,比如
    	//${group}/copm.yhd.practice.dubbo.ISayHelloService:${version}:20880
        String key = serviceKey(url);
    	// 創建 DubboExporter
        DubboExporter<T> exporter = new DubboExporter<T>(invoker, key, exporterMap);
    	// 將 <key, exporter>  鍵值對放入快取中
        exporterMap.put(key, exporter);

        //export an stub service for dispatching event
        Boolean isStubSupportEvent = url.getParameter(STUB_EVENT_KEY, DEFAULT_STUB_EVENT);
        Boolean isCallbackservice = url.getParameter(IS_CALLBACK_SERVICE, false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter(STUB_EVENT_METHODS_KEY);
            if (stubServiceMethods == null || stubServiceMethods.length() == 0) {
                if (logger.isWarnEnabled()) {
                    logger.warn(new IllegalStateException("consumer [" + url.getParameter(INTERFACE_KEY) +
                            "], has set stubproxy support event ,but no stub methods founded."));
                }

            }
        }
		//啟動服務
        openServer(url);
    	//優化序列化
        optimizeSerialization(url);

        return exporter;
    }

4)openServer

去開啟一個服務,并且放入到快取中->在同一臺機器上(單網卡),同一個埠上僅允許啟動一個服務器實體,

    private void openServer(URL url) {
        // find server.
        //  獲取 host:port ,并將其作為服務器實體的 key ,用于標識當前的服務器實體
        String key = url.getAddress();
        //client can export a service which's only for server to invoke
        //client  也可以暴露一個只有 server 可以呼叫的服務
        boolean isServer = url.getParameter(IS_SERVER_KEY, true);
        if (isServer) {
            //是否在 serverMap 中快取了
            ProtocolServer server = serverMap.get(key);
            if (server == null) {
                synchronized (this) {
                    server = serverMap.get(key);
                    if (server == null) {
                        //創建服務器實體
                        serverMap.put(key, createServer(url));
                    }
                }
            } else {
                //服務器已創建,則根據 url  中的配置重置服務器
                // server supports reset, use together with override
                server.reset(url);
            }
        }
    }

5)createServer

創建服務,開啟心跳檢測,默認使用 netty,組裝 url,

    private ProtocolServer createServer(URL url) {
        // 組裝 url ,在 url 中添加心跳時間、編解碼引數
        url = URLBuilder.from(url)
            //  當服務關閉以后,發送一個只讀的事件,默認是開啟狀態
                // send readonly event when server closes, it's enabled by default
                .addParameterIfAbsent(CHANNEL_READONLYEVENT_SENT_KEY, Boolean.TRUE.toString())
                // enable heartbeat by default
            //  啟動心跳配置
                .addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT))
                .addParameter(CODEC_KEY, DubboCodec.NAME)
                .build();
        String str = url.getParameter(SERVER_KEY, DEFAULT_REMOTING_SERVER);
		// 通過 SPI  檢測是否存在 server  引數所代表的 Transporter  拓展,不存在則拋出例外
        if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
            throw new RpcException("Unsupported server type: " + str + ", url: " + url);
        }
		// 創建 ExchangeServer
        ExchangeServer server;
        try {
            server = Exchangers.bind(url, requestHandler);
        } catch (RemotingException e) {
            throw new RpcException("Fail to start server(url: " + url + ") " + e.getMessage(), e);
        }

        str = url.getParameter(CLIENT_KEY);
        if (str != null && str.length() > 0) {
            Set<String> supportedTypes = ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions();
            if (!supportedTypes.contains(str)) {
                throw new RpcException("Unsupported client type: " + str);
            }
        }

        return new DubboProtocolServer(server);
    }

6)Exchangers.bind

    public static ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handler == null) {
            throw new IllegalArgumentException("handler == null");
        }
        // 獲取 Exchanger ,默認為 HeaderExchanger 
        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
        // 呼叫 HeaderExchanger  的 bind  方法創建 ExchangeServer  實體
        return getExchanger(url).bind(url, handler);
    }

7)headerExchanger.bind

這里面包含多個邏輯

  • new DecodeHandler(new HeaderExchangeHandler(handler))
  • Transporters.bind
  • new HeaderExchangeServer

目前只需要關心 transporters.bind 方法即可

public ExchangeServer bind(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeServer(Transporters.bind(url, new DecodeHandler(new HeaderExchangeHandler(handler))));
}

8)Transporters.bind

    public static RemotingServer bind(URL url, ChannelHandler... handlers) throws RemotingException {
        if (url == null) {
            throw new IllegalArgumentException("url == null");
        }
        if (handlers == null || handlers.length == 0) {
            throw new IllegalArgumentException("handlers == null");
        }
        ChannelHandler handler;
        if (handlers.length == 1) {
            handler = handlers[0];
        } else {
            //  如果 handlers  元素數量大于 1 ,則創建 ChannelHandler  分發器
            handler = new ChannelHandlerDispatcher(handlers);
        }
        //  獲取自適應 Transporter  實體,并呼叫實體方法
        return getTransporter().bind(url, handler);
    }

9)getTransporter

getTransporter 是一個自適應擴展點,它針對 bind 方法添加了自適應注解,意味著,bing 方法的具體實作,會基于Transporter$Adaptive 方法進行適配,那么在這里面默認的通信協議是 netty,所以它會采用 netty4 的實作,也就是org.apache.dubbo.remoting.transport.netty4.NettyTransporter

10)NettyTransporter.bind

public RemotingServer bind(URL url, ChannelHandler handler) throws RemotingException {
    return new NettyServer(url, handler);
}

6.NettyServer

初始化一個 nettyserver,并且從 url 中獲得相應的 ip/ port,然后呼叫 doOpen();

    public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        // you can customize name and type of client thread pool by THREAD_NAME_KEY and THREADPOOL_KEY in CommonConstants.
        // the handler will be wrapped: MultiMessageHandler->HeartbeatHandler->handler
        super(ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME), ChannelHandlers.wrap(handler, url));
    }
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
    super(url, handler);
    localAddress = getUrl().toInetSocketAddress();
	//獲取IP
    String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
    //獲取埠
    int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
    if (url.getParameter(ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
        bindIp = ANYHOST_VALUE;
    }
    bindAddress = new InetSocketAddress(bindIp, bindPort);
    this.accepts = url.getParameter(ACCEPTS_KEY, DEFAULT_ACCEPTS);
    this.idleTimeout = url.getParameter(IDLE_TIMEOUT_KEY, DEFAULT_IDLE_TIMEOUT);
    try {
        //  呼叫模板方法 doOpen  啟動服務器
        doOpen();
        if (logger.isInfoEnabled()) {
            logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
        }
    } catch (Throwable t) {
        throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
    }
    executor = executorRepository.createExecutorIfAbsent(url);
}

1)doOpen

開啟 netty 服務

    protected void doOpen() throws Throwable {
        bootstrap = new ServerBootstrap();

        bossGroup = NettyEventLoopFactory.eventLoopGroup(1, "NettyServerBoss");
        workerGroup = NettyEventLoopFactory.eventLoopGroup(
                getUrl().getPositiveParameter(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                "NettyServerWorker");

        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);
        channels = nettyServerHandler.getChannels();

        bootstrap.group(bossGroup, workerGroup)
                .channel(NettyEventLoopFactory.serverSocketChannelClass())
                .option(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<SocketChannel>() {
                    @Override
                    protected void initChannel(SocketChannel ch) throws Exception {
                        // FIXME: should we use getTimeout()?
                        int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        if (getUrl().getParameter(SSL_ENABLED_KEY, false)) {
                            ch.pipeline().addLast("negotiation",
                                    SslHandlerInitializer.sslServerHandler(getUrl(), nettyServerHandler));
                        }
                        ch.pipeline()
                                .addLast("decoder", adapter.getDecoder())
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("server-idle-handler", new IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();

    }

這里用到了一個 handler 來處理客戶端傳遞過來的請求NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);

這個 handler 是一個鏈路,后續接收到的請求,會一層一層的處理,

7.Invoker是什么

從前面的分析來看,服務的發布分三個階段

  • 創造一個 invoker
  • 把經歷過一系列處理的 invoker(各種包裝),在 DubboProtocol 中保存到 exporterMap 中
  • 把 dubbo 協議的 url 地址注冊到注冊中心上

Invoker 是 Dubbo 領域模型中非常重要的一個概念, 和 ExtensionLoader 的重要性是一樣的,回到 ServiceConfig 中 export 的代碼,這段代碼是還沒有分析過的,以這個作為入口來分析export 出去的 invoker 到底是啥東西,

Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);

1)ProxyFacotory.getInvoker

這個是一個代理工程,用來生成 invoker,從它的定義來看,它是一個自適應擴展點,看到這樣的擴展點,我們幾乎可以不假思索的想到它會存在一個動態配接器類,

private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

2)ProxyFactory

這個方法的簡單解讀為: 它是一個 spi 擴展點,并且默認的擴展實作是 javassit, 這個介面中有三個方法,并且都是加了@Adaptive 的自適應擴展點,所以如果呼叫 getInvoker 方法,應該會回傳一個 ProxyFactory$Adaptiv,

@SPI("javassist")
public interface ProxyFactory {

    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker) throws RpcException;

    @Adaptive({PROXY_KEY})
    <T> T getProxy(Invoker<T> invoker, boolean generic) throws RpcException;

    @Adaptive({PROXY_KEY})
    <T> Invoker<T> getInvoker(T proxy, Class<T> type, URL url) throws RpcException;

}

3)ProxyFactory$Adaptive

這個自適應擴展點,做了兩件事情

  • 通過 ExtensionLoader.getExtensionLoader(ProxyFactory.class).getExtension(extName)獲取了一個指定名稱的擴展點
    • dubbo-rpc-api/resources/META-INF/com.alibaba.dubbo.rpc.ProxyFactory 中,定義了 javassis=JavassisProxyFactory
  • 呼叫 JavassisProxyFactorygetInvoker 方法

javassist 是一個動態類別庫,用來實作動態代理的,構建好了代理類之后,回傳一個 AbstractproxyInvoker,并且它實作了 doInvoke 方法,這個地方似乎看到了 dubbo 消費者呼叫過來的時候觸發的影子,因為 wrapper.invokeMethod 本質上就是觸發上面動態代理類的方法 invokeMethod,

總結一下 Invoke 本質上應該是一個代理,經過層層包裝最終進行了發布,當消費者發起請求的時候,會獲得這個invoker 進行呼叫,最終發布出去的 invoker, 也不是單純的一個代理,也是經過多層包裝,

在這里插入圖片描述

十三,服務注冊

1.服務注冊的核心邏輯

服務發布這一條線分析完成之后,再來了解一下服務注冊的程序,

在看服務發布的原始碼的時候,在RegistryProtocol這個類中,看到了服務發布的流程,

從export方法中抽離出來的部分代碼,就是服務注冊的流程,

	   //  根據 invoker 中的 url 獲取 Registry 實體 : zookeeperRegistry
        final Registry registry = getRegistry(originInvoker);
        // 獲取要注冊到注冊中心的 URL: dubbo://ip:port
        final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);

        // decide if we need to delay publish
        boolean register = providerUrl.getParameter(REGISTER_KEY, true);
        // 是否配置了注冊中心,如果是, 則需要注冊
        if (register) {
            register(registryUrl, registeredProviderUrl);
        }

2.getRegistry

  • 把url轉化為對應配置的注冊中心的具體協議
  • 根據具體協議,從registryFactory中獲得指定的注冊中心實作

這個registryFactory具體是怎么賦值的呢?

    protected Registry getRegistry(final Invoker<?> originInvoker) {
        //把url轉化為配置的具體協議,比如zookeeper://ip:port. 這樣后續獲得的注冊中心就會是基于zk的實作
        URL registryUrl = getRegistryUrl(originInvoker);
        return registryFactory.getRegistry(registryUrl);
    }

在RegistryProtocol中存在一段這樣的代碼,很明顯這是通過依賴注入來實作的擴展點,

private RegistryFactory registryFactory;
public void setRegistryFactory(RegistryFactory registryFactory) {
        this.registryFactory = registryFactory;
    }

按照擴展點的加載規則,我們可以先看看/META-INF/dubbo/internal路徑下找到RegistryFactory的組態檔.這個factory有多個擴展點的實作,

dubbo=org.apache.dubbo.registry.dubbo.DubboRegistryFactory
multicast=org.apache.dubbo.registry.multicast.MulticastRegistryFactory
zookeeper=org.apache.dubbo.registry.zookeeper.ZookeeperRegistryFactory
redis=org.apache.dubbo.registry.redis.RedisRegistryFactory
consul=org.apache.dubbo.registry.consul.ConsulRegistryFactory
etcd3=org.apache.dubbo.registry.etcd.EtcdRegistryFactory

接著,找到RegistryFactory的實作, 發現它里面有一個自適應的方法,根據url中protocol傳入的值進行適配,

@SPI("dubbo")
public interface RegistryFactory {
}

3.RegistryFactory$Adaptive

由于在前面的代碼中,url中的protocol已經改成了zookeeper,那么這個時候根據zookeeper獲得的spi擴展點應該是ZookeeperRegistryFactory,

import org.apache.dubbo.common.extension.ExtensionLoader;

public class RegistryFactory$Adaptive implements org.apache.dubbo.registry.RegistryFactory {
    
  public org.apache.dubbo.registry.Registry getRegistry(org.apache.dubbo.common.URL arg0) {
    
    if (arg0 == null) throw new IllegalArgumentException("url == null");
    org.apache.dubbo.common.URL url = arg0;
    String extName = ( url.getProtocol() == null ? "dubbo" :url.getProtocol() );
    if(extName == null) throw new IllegalStateException("Failed to getextension (org.apache.dubbo.registry.RegistryFactory) name from url (" +url.toString() + ") use keys([protocol])");
    org.apache.dubbo.registry.RegistryFactory extension =(org.apache.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(org.apache.dubbo.registry.RegistryFactory.class).getExtension(extName);
    return extension.getRegistry(arg0);
 }
}

4.ZookeeperRegistryFactory

這個方法中并沒有getRegistry方法,而是在父類AbstractRegistryFactory

  • 從快取REGISTRIES中,根據key獲得對應的Registry
  • 如果不存在,則創建Registry
    @Override
    public Registry getRegistry(URL url) {
        if (destroyed.get()) {
            
            return DEFAULT_NOP_REGISTRY;
        }

        url = URLBuilder.from(url)
                .setPath(RegistryService.class.getName())
                .addParameter(INTERFACE_KEY, RegistryService.class.getName())
                .removeParameters(EXPORT_KEY, REFER_KEY)
                .build();
        String key = createRegistryCacheKey(url);
        // Lock the registry access process to ensure a single instance of the registry
        LOCK.lock();
        try {
            Registry registry = REGISTRIES.get(key);
            if (registry != null) {
                return registry;
            }
            //create registry by spi/ioc
            //創建注冊中心
            registry = createRegistry(url);
            if (registry == null) {
                throw new IllegalStateException("Can not create registry " + url);
            }
            REGISTRIES.put(key, registry);
            return registry;
        } finally {
            // Release the lock
            LOCK.unlock();
        }
    }

5.createRegistry

創建一個zookeeperRegistry,把urlzookeepertransporter作為引數傳入,

zookeeperTransporter 這個屬性也是基于依賴注入來賦值的,具體的流程就不再分析了,這個的值應該是CuratorZookeeperTransporter 表示具體使用什么框架來和zk產生連接,

    @Override
    public Registry createRegistry(URL url) {
        return new ZookeeperRegistry(url, zookeeperTransporter);
    }

6.ZookeeperRegistry

這個方法中使用了CuratorZookeeperTransport來實作zk的連接,

    public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
        super(url);
        if (url.isAnyHost()) {
            throw new IllegalStateException("registry address == null");
        }
        //獲得group名稱
        String group = url.getParameter(GROUP_KEY, DEFAULT_ROOT);
        if (!group.startsWith(PATH_SEPARATOR)) {
            group = PATH_SEPARATOR + group;
        }
        this.root = group;
        //產生一個zookeeper連接
        zkClient = zookeeperTransporter.connect(url);
        //添加zookeeper狀態變化事件
        zkClient.addStateListener((state) -> {
            if (state == StateListener.RECONNECTED) {
                
                ZookeeperRegistry.this.fetchLatestAddresses();
            } else if (state == StateListener.NEW_SESSION_CREATED) {
               
                try {
                    ZookeeperRegistry.this.recover();
                } catch (Exception e) {
                    logger.error(e.getMessage(), e);
                }
            } else if (state == StateListener.SESSION_LOST) {
                
            } else if (state == StateListener.SUSPENDED) {

            } else if (state == StateListener.CONNECTED) {

            }
        });
    }

7.register

獲取到了注冊中心之后,回到服務注冊的核心邏輯,開始呼叫register方法,去將dubbo://的協議地址注冊到zookeeper上

    private void register(URL registryUrl, URL registeredProviderUrl) {
        Registry registry = registryFactory.getRegistry(registryUrl);
        registry.register(registeredProviderUrl);
    }

這個方法會呼叫FailbackRegistry類中的register. 為什么呢?因為ZookeeperRegistry這個類中并沒有register這個方法,但是他的父類FailbackRegistry中存在register方法,而這個類又重寫了AbstractRegistry類中的register方法,所以我們可以直接定位大FailbackRegistry這個類中的register方法中

8.FailbackRegistry.register

    @Override
    public void register(URL url) {
        if (!acceptable(url)) {
            
            return;
        }
        super.register(url);
        removeFailedRegistered(url);
        removeFailedUnregistered(url);
        try {
            // 呼叫子類實作真正的服務注冊,把url注冊到zk上
            doRegister(url);
        } catch (Exception e) {
            Throwable t = e;

            // If the startup detection is opened, the Exception is thrown directly.
            // 如果開啟了啟動時檢測,則直接拋出例外
            boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                    && url.getParameter(Constants.CHECK_KEY, true)
                    && !CONSUMER_PROTOCOL.equals(url.getProtocol());
            boolean skipFailback = t instanceof SkipFailbackWrapperException;
            if (check || skipFailback) {
                if (skipFailback) {
                    t = t.getCause();
                }
                throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
            } else {
                
            }

            // Record a failed registration request to a failed list, retry regularly
            // 將失敗的注冊請求記錄到失敗串列,定時重試
            addFailedRegistered(url);
        }
    }

9.ZookeeperRegistry.doRegister

最終呼叫curator的客戶端把服務地址注冊到zk,

    @Override
    public void doRegister(URL url) {
        try {
            zkClient.create(toUrlPath(url), url.getParameter(DYNAMIC_KEY, true));
        } catch (Throwable e) {
            throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

在這里插入圖片描述

十四,服務消費

1.服務消費應該具備的邏輯

  • 生成一個代理物件–實作網路通信的細節
  • 建立通信連接–netty
  • 從zk獲取目標地址–訂閱節點變化
  • 實作負載均衡
  • 實作集群容錯
  • mock
  • 序列化

2.服務消費的入口

ReferenceAnnotationBeanPostProcessor->ReferenceBeanInvocationHandler.init->ReferenceConfig.get() //獲得一個遠程代理類

1)ReferenceConfig.get()

    public synchronized T get() {
        if (destroyed) {
            throw new IllegalStateException("The invoker of ReferenceConfig(" + url + ") has already destroyed!");
        }
        if (ref == null) {//如果當前介面的遠程代理參考為空,則進行初始化
            init();
        }
        return ref;
    }

2)init

初始化的程序,和服務發布的程序類似,會有特別多的判斷以及引數的組裝. 我們只需要關注createProxy,創建代理類的方法,

    public synchronized void init() {
        if (initialized) {
            return;
        }

        if (bootstrap == null) {
            bootstrap = DubboBootstrap.getInstance();
            bootstrap.init();
        }

        checkAndUpdateSubConfigs();

        checkStubAndLocal(interfaceClass);
        ConfigValidationUtils.checkMock(interfaceClass, this);

        Map<String, String> map = new HashMap<String, String>();
        map.put(SIDE_KEY, CONSUMER_SIDE);

        ReferenceConfigBase.appendRuntimeParameters(map);
        if (!ProtocolUtils.isGeneric(generic)) {
            String revision = Version.getVersion(interfaceClass, version);
            if (revision != null && revision.length() > 0) {
                map.put(REVISION_KEY, revision);
            }

            String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
            if (methods.length == 0) {
                logger.warn("No method found in service interface " + interfaceClass.getName());
                map.put(METHODS_KEY, ANY_VALUE);
            } else {
                map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), COMMA_SEPARATOR));
            }
        }
        map.put(INTERFACE_KEY, interfaceName);
        AbstractConfig.appendParameters(map, getMetrics());
        AbstractConfig.appendParameters(map, getApplication());
        AbstractConfig.appendParameters(map, getModule());
        // remove 'default.' prefix for configs from ConsumerConfig
        // appendParameters(map, consumer, Constants.DEFAULT_KEY);
        AbstractConfig.appendParameters(map, consumer);
        AbstractConfig.appendParameters(map, this);
        MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
        if (metadataReportConfig != null && metadataReportConfig.isValid()) {
            map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
        }
        Map<String, AsyncMethodInfo> attributes = null;
        if (CollectionUtils.isNotEmpty(getMethods())) {
            attributes = new HashMap<>();
            for (MethodConfig methodConfig : getMethods()) {
                AbstractConfig.appendParameters(map, methodConfig, methodConfig.getName());
                String retryKey = methodConfig.getName() + ".retry";
                if (map.containsKey(retryKey)) {
                    String retryValue = map.remove(retryKey);
                    if ("false".equals(retryValue)) {
                        map.put(methodConfig.getName() + ".retries", "0");
                    }
                }
                AsyncMethodInfo asyncMethodInfo = AbstractConfig.convertMethodConfig2AsyncInfo(methodConfig);
                if (asyncMethodInfo != null) {
//                    consumerModel.getMethodModel(methodConfig.getName()).addAttribute(ASYNC_KEY, asyncMethodInfo);
                    attributes.put(methodConfig.getName(), asyncMethodInfo);
                }
            }
        }

        String hostToRegistry = ConfigUtils.getSystemProperty(DUBBO_IP_TO_REGISTRY);
        if (StringUtils.isEmpty(hostToRegistry)) {
            hostToRegistry = NetUtils.getLocalHost();
        } else if (isInvalidLocalHost(hostToRegistry)) {
            throw new IllegalArgumentException("Specified invalid registry ip from property:" + DUBBO_IP_TO_REGISTRY + ", value:" + hostToRegistry);
        }
        map.put(REGISTER_IP_KEY, hostToRegistry);

        serviceMetadata.getAttachments().putAll(map);
		//創建代理
        ref = createProxy(map);

        serviceMetadata.setTarget(ref);
        serviceMetadata.addAttribute(PROXY_CLASS_REF, ref);
        ConsumerModel consumerModel = repository.lookupReferredService(serviceMetadata.getServiceKey());
        consumerModel.setProxyObject(ref);
        consumerModel.init(attributes);

        initialized = true;

        checkInvokerAvailable();

        // dispatch a ReferenceConfigInitializedEvent since 2.7.4
        dispatch(new ReferenceConfigInitializedEvent(this, invoker));
    }

3)createProxy

  • 判斷是否為本地呼叫,如果是則使用injvm協議進行呼叫
  • 判斷是否為點對點呼叫,如果是則把url保存到urls集合中,如果url為1,進入步驟4,如果urls>1,則執行5
  • 如果是配置了注冊中心,遍歷注冊中心,把url添加到urls集合,url為1,進入步驟4,如果urls>1,則執行5
  • 直連構建invoker
  • 構建invokers集合,通過cluster合并多個invoker
  • 最后呼叫 ProxyFactory 生成代理類
    private T createProxy(Map<String, String> map) {
        if (shouldJvmRefer(map)) {//判斷是否是在同一個jvm行程中呼叫
            URL url = new URL(LOCAL_PROTOCOL, LOCALHOST_VALUE, 0, interfaceClass.getName()).addParameters(map);
            invoker = REF_PROTOCOL.refer(interfaceClass, url);
            if (logger.isInfoEnabled()) {
                logger.info("Using injvm service " + interfaceClass.getName());
            }
        } else {
            urls.clear();
            //url 如果不為空,說明是點對點通信
            if (url != null && url.length() > 0) { 
                String[] us = SEMICOLON_SPLIT_PATTERN.split(url);
                if (us != null && us.length > 0) {
                    for (String u : us) {
                        URL url = URL.valueOf(u);
                        if (StringUtils.isEmpty(url.getPath())) {
                            url = url.setPath(interfaceName);
                        }
                        // 檢測 url 協議是否為 registry,若是,表明用戶想使用指定的注冊中心
                        if (UrlUtils.isRegistry(url)) {
                            // 將 map 轉換為查詢字串,并作為 refer 引數的值添加到url 中
                            urls.add(url.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        } else {
                            // 合并 url,移除服務提供者的一些配置(這些配置來源于用戶配置的 url 屬性),比如執行緒池相關配置,并保留服務提供者的部分配置,比如版本,group,時間戳等,最后將合并后的配置設定為 url 查詢字串中,
                            urls.add(ClusterUtils.mergeUrl(url, map));
                        }
                    }
                }
            } else { // assemble URL from register center's configuration
              
                if (!LOCAL_PROTOCOL.equalsIgnoreCase(getProtocol())) {
                    checkRegistry(); //校驗注冊中心的配置以及是否有必要從配置中心組裝url
                    //這里的代碼實作和服務端類似,也是根據注冊中心配置進行決議得到URL
                    //這里的URL肯定也是:registry://ip:port/org.apache.dubbo.service.RegsitryService
                    List<URL> us = ConfigValidationUtils.loadRegistries(this, false);
                    if (CollectionUtils.isNotEmpty(us)) {
                        for (URL u : us) {
                            URL monitorUrl = ConfigValidationUtils.loadMonitor(this, u);
                            if (monitorUrl != null) {
                                map.put(MONITOR_KEY, URL.encode(monitorUrl.toFullString()));
                            }
                            urls.add(u.addParameterAndEncoded(REFER_KEY, StringUtils.toQueryString(map)));
                        }
                    }
                    if (urls.isEmpty()) {//如果沒有配置注冊中心,則報錯
                        throw new IllegalStateException();
                    }
                }
            }
			//如果值配置了一個注冊中心或者一個服務提供者,直接使用refprotocol.refer
            if (urls.size() == 1) {
                invoker = REF_PROTOCOL.refer(interfaceClass, urls.get(0));
            } else {
                List<Invoker<?>> invokers = new ArrayList<Invoker<?>>();
                URL registryURL = null;
                for (URL url : urls) {//遍歷urls生成多個invoker
                    invokers.add(REF_PROTOCOL.refer(interfaceClass, url));
                    if (UrlUtils.isRegistry(url)) {
                        registryURL = url; // use last registry url
                    }
                }
                if (registryURL != null) { //如果registryUrl不為空,構建靜態directory
                    
                    String cluster = registryURL.getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME);
                    // 通過Cluster將多個invoker合并RegistryAwareClusterInvoker(StaticDirectory) ->FailoverClusterInvoker(RegistryDirectory, will execute route) -> Invoker
                    invoker = Cluster.getCluster(cluster, false).join(new StaticDirectory(registryURL, invokers));
                } else { // not a registry url, must be direct invoke.
                    String cluster = CollectionUtils.isNotEmpty(invokers)
                            ? (invokers.get(0).getUrl() != null ? invokers.get(0).getUrl().getParameter(CLUSTER_KEY, ZoneAwareCluster.NAME) : Cluster.DEFAULT)
                            : Cluster.DEFAULT;
                    invoker = Cluster.getCluster(cluster).join(new StaticDirectory(invokers));
                }
            }
        }

        if (logger.isInfoEnabled()) {
            logger.info("Refer dubbo service " + interfaceClass.getName() + " from url " + invoker.getUrl());
        }
        /**
         * @since 2.7.0
         * ServiceData Store
         */
        String metadata = map.get(METADATA_KEY);
        WritableMetadataService metadataService = WritableMetadataService.getExtension(metadata == null ? DEFAULT_METADATA_STORAGE_TYPE : metadata);
        if (metadataService != null) {
            URL consumerURL = new URL(CONSUMER_PROTOCOL, map.remove(REGISTER_IP_KEY), 0, map.get(INTERFACE_KEY), map);
            metadataService.publishServiceDefinition(consumerURL);
        }
        // create service proxy
        return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));
    }

4)protocol.refer

這里通過指定的協議來呼叫refer生成一個invoker物件,invoker前面看過,它是一個代理物件,那么在當前的消費端而言,invoker主要用于執行遠程呼叫,

這個protocol,又是一個自適應擴展點,它得到的是一個Protocol$Adaptive.

根據當前的協議url,得到一個指定的擴展點,傳遞進來的引數中,協議地址為registry://,所以,我們可以直接定位到RegistryProtocol.refer代碼

5)RegistryProtocol.refer

  • 組裝注冊中心協議的url
  • 判斷是否配置了group,如果有,則cluster=getMergeableCluster(),構建invoker
  • doRefer構建invoker
    public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
        //根據配置的協議,生成注冊中心的url: zookeeper://
        url = getRegistryUrl(url);
        //獲取注冊中心
        Registry registry = registryFactory.getRegistry(url);
        if (RegistryService.class.equals(type)) {
            return proxyFactory.getInvoker((T) registry, type, url);
        }

        // group="a,b" or group="*"
        // 決議group引數,根據group決定cluster的型別
        Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
        String group = qs.get(GROUP_KEY);
        if (group != null && group.length() > 0) {
            if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
                return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
            }
        }

        Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
        return doRefer(cluster, registry, type, url);
    }

6)doRefer

  • 構建一個RegistryDirectory
  • 構建一個consumer://協議的地址注冊到注冊中心
  • 訂閱zookeeper中節點的變化
  • 呼叫cluster.join方法
    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        //RegistryDirectory初始化
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        //注冊consumer://協議的url
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        //訂閱事件監聽
        directory.subscribe(toSubscribeUrl(subscribeUrl));
	   //構建invoker
        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

3.Cluster是什么

我們只關注一下Invoker這個代理類的創建程序,其他的暫且不關心.

Invoker<T> invoker = cluster.join(directory);

在方法refer中:

Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
    static Cluster getCluster(String name) {
        return getCluster(name, true);
    }

    static Cluster getCluster(String name, boolean wrap) {
        if (StringUtils.isEmpty(name)) {
            name = Cluster.DEFAULT;
        }
        return ExtensionLoader.getExtensionLoader(Cluster.class).getExtension(name, wrap);
    }

1)Cluster$Adaptive

在動態適配的類中會基于extName,選擇一個合適的擴展點進行適配,由于默認情況下cluster:failover,所以getExtension(“failover”)理論上應該回傳FailOverCluster,但實際上,這里做了包裝 MockClusterWrapper(FailOverCluster)

public class Cluster$Adaptive implements org.apache.dubbo.rpc.cluster.Cluster {
  public org.apache.dubbo.rpc.Invoker join(org.apache.dubbo.rpc.cluster.Directory arg0) throws org.apache.dubbo.rpc.RpcException {
    if (arg0 == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argument ==null");
    if (arg0.getUrl() == null) throw new IllegalArgumentException("org.apache.dubbo.rpc.cluster.Directory argumentgetUrl() == null");
    org.apache.dubbo.common.URL url = arg0.getUrl();
    String extName = url.getParameter("cluster", "failover");
    if(extName == null) 
        throw new IllegalStateException("Failed to getextension (org.apache.dubbo.rpc.cluster.Cluster) name from url (" +url.toString() + ") use keys([cluster])");
    org.apache.dubbo.rpc.cluster.Cluster extension =(org.apache.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(org.apa
che.dubbo.rpc.cluster.Cluster.class).getExtension(extName);
    return extension.join(arg0);
 }
}

2)cluster.join

再回到doRefer方法,下面這段代碼, 實際是呼叫MockClusterWrapper(FailOverCluster.join)

Invoker invoker = cluster.join(directory);

所以這里回傳的invoker,應該是MockClusterWrapper(FailOverCluster(directory))

接著回到ReferenceConfig.createProxy方法中的最后一行.

3)proxyFactory.getProxy

拿到invoker之后,會呼叫獲得一個動態代理類

return (T) PROXY_FACTORY.getProxy(invoker, ProtocolUtils.isGeneric(generic));

而,這個PROXY_FACTORY 又是一個自適應擴展點,

    /**
     * A {@link ProxyFactory} implementation that will generate a reference service's proxy,the JavassistProxyFactory is
     * its default implementation
     */
    private static final ProxyFactory PROXY_FACTORY = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();

4)JavassistProxyFactory.getProxy

通過這個方法生成了一個動態代理類,并且對invoker再做了一層處理,InvokerInvocationHandler,意味著后續發起服務呼叫的時候,會由InvokerInvocationHandler來進行處理,

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

5)proxy.getProxy

在proxy.getProxy這個方法中會生成一個動態代理類,通過debug的形式可以看到動態代理類的原貌,在getProxy這個方法位置加一個斷點,

public static Proxy getProxy(Class<?>... ics) {
    return getProxy(ClassUtils.getClassLoader(Proxy.class), ics);
}

@Reference注入的一個物件實體本質上就是一個動態代理類,通過呼叫這個類中的方法,會觸發handler.invoke(), 而這個handler就是InvokerInvocationHandler,

4.網路連接的建立

接下來看 目標服務地址資訊以及網路通信的建立,

1)RegistryProtocol.doRefer

關注directory.subscribe這個方法,它是實作服務目標服務訂閱的,

    private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
        RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
        directory.setRegistry(registry);
        directory.setProtocol(protocol);
        // all attributes of REFER_KEY
        Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
        URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
        if (directory.isShouldRegister()) {
            directory.setRegisteredConsumerUrl(subscribeUrl);
            registry.register(directory.getRegisteredConsumerUrl());
        }
        directory.buildRouterChain(subscribeUrl);
        directory.subscribe(toSubscribeUrl(subscribeUrl));

        Invoker<T> invoker = cluster.join(directory);
        List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
        if (CollectionUtils.isEmpty(listeners)) {
            return invoker;
        }

        RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
        for (RegistryProtocolListener listener : listeners) {
            listener.onRefer(this, registryInvokerWrapper);
        }
        return registryInvokerWrapper;
    }

2)RegistryDirectory.subscribe

訂閱注冊中心指定節點的變化,如果發生變化,則通知到RegistryDirectory,Directory其實和服務的注冊以及服務的發現有非常大的關聯,

    public void subscribe(URL url) {
        setConsumerUrl(url); //設定consumerUrl
        //把當前RegistryDirectory作為listener,去監聽zk上節點的變化
        CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
        serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
        //訂閱 -> 這里的registry是zookeeperRegsitry
        registry.subscribe(url, this);
    }

這里的registry 是ZookeeperRegistry ,會去監聽并獲取路徑下面的節點,監聽的路徑是:
/dubbo/org.apache.dubbo.demo.DemoService/providers/dubbo/org.apache.dubbo.demo.DemoService/configurators/dubbo/org.apache.dubbo.demo.DemoService/routers 節點下面的子節點變動,

3)FailbackRegistry.subscribe

listener為RegistryDirectory,后續要用到,移除失效的listener,呼叫doSubscribe進行訂閱,

    public void subscribe(URL url, NotifyListener listener) {
        super.subscribe(url, listener);
        removeFailedSubscribed(url, listener);
        try {
            // Sending a subscription request to the server side
            doSubscribe(url, listener);
        } catch (Exception e) {
            Throwable t = e;

            List<URL> urls = getCacheUrls(url);
            if (CollectionUtils.isNotEmpty(urls)) {
                notify(url, listener, urls);
                logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
            } else {
                // If the startup detection is opened, the Exception is thrown directly.
                boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
                        && url.getParameter(Constants.CHECK_KEY, true);
                boolean skipFailback = t instanceof SkipFailbackWrapperException;
                if (check || skipFailback) {
                    if (skipFailback) {
                        t = t.getCause();
                    }
                    throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
                } else {
                    logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
                }
            }

            // Record a failed registration request to a failed list, retry regularly
            addFailedSubscribed(url, listener);
        }
    }

4)ZookeeperRegistry.doSubscribe

這個方法是訂閱,邏輯實作比較多,可以分兩段來看,這里的實作把所有Service層發起的訂閱以及指定的Service層發起的訂閱分開處理,所有Service層類似于監控中心發起的訂閱,指定的Service層發起的訂閱可以看作是服務消費者的訂閱,我們只需要關心指定service層發起的訂閱即可,

    public void doSubscribe(final URL url, final NotifyListener listener) {
        try {
            if (ANY_VALUE.equals(url.getServiceInterface())) {
                String root = toRootPath();
                ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> {
                    for (String child : currentChilds) {
                        child = URL.decode(child);
                        if (!anyServices.contains(child)) {
                            anyServices.add(child);
                            subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
                                    Constants.CHECK_KEY, String.valueOf(false)), k);
                        }
                    }
                });
                zkClient.create(root, false);
                List<String> services = zkClient.addChildListener(root, zkListener);
                if (CollectionUtils.isNotEmpty(services)) {
                    for (String service : services) {
                        service = URL.decode(service);
                        anyServices.add(service);
                        subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
                                Constants.CHECK_KEY, String.valueOf(false)), listener);
                    }
                }
            } else {
                List<URL> urls = new ArrayList<>();
                for (String path : toCategoriesPath(url)) {
                    // 如果之前該路徑沒有添加過listener,則創建一個map來放置listener
                    ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
                    // 如果沒有添加過對于子節點的listener,則創建,通知服務變化 回呼NotifyListener
                    ChildListener zkListener = listeners.computeIfAbsent(listener, k -> (parentPath, currentChilds) -> ZookeeperRegistry.this.notify(url, k, toUrlsWithEmpty(url, parentPath, currentChilds)));
                    zkClient.create(path, false);
                    //添加path節點的當前節點及子節點監聽,并且獲取子節點資訊
                    //也就是dubbo://ip:port/...
                    List<String> children = zkClient.addChildListener(path, zkListener);
                    if (children != null) {
                        urls.addAll(toUrlsWithEmpty(url, path, children));
                    }
                }
                //呼叫notify進行通知,對已經可用的串列進行通知
                notify(url, listener, urls);
            }
        } catch (Throwable e) {
            throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

5)FailbackRegistry.notify

呼叫FailbackRegistry.notify, 對引數進行判斷, 然后呼叫AbstractRegistry.notify方法,

    protected void notify(URL url, NotifyListener listener, List<URL> urls) {
        if (url == null) {
            throw new IllegalArgumentException("notify url == null");
        }
        if (listener == null) {
            throw new IllegalArgumentException("notify listener == null");
        }
        try {
            doNotify(url, listener, urls);
        } catch (Exception t) {
            // Record a failed registration request to a failed list, retry regularly
            addFailedNotified(url, listener, urls);
            logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
        }
    }

6)AbstractRegistry.notify

這里面會針對每一個category,呼叫listener.notify進行通知,然后更新本地的快取檔案

protected void notify(URL url, NotifyListener listener, List<URL> urls) {
    if (url == null) {
        throw new IllegalArgumentException("notify url == null");
    }
    if (listener == null) {
        throw new IllegalArgumentException("notify listener == null");
    }
    if ((CollectionUtils.isEmpty(urls))
            && !ANY_VALUE.equals(url.getServiceInterface())) {
        logger.warn("Ignore empty notify urls for subscribe url " + url);
        return;
    }
    if (logger.isInfoEnabled()) {
        logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
    }
    // keep every provider's category.
    Map<String, List<URL>> result = new HashMap<>();
    for (URL u : urls) {
        if (UrlUtils.isMatch(url, u)) {
            String category = u.getParameter(CATEGORY_KEY, DEFAULT_CATEGORY);
            List<URL> categoryList = result.computeIfAbsent(category, k -> new ArrayList<>());
            categoryList.add(u);
        }
    }
    if (result.size() == 0) {
        return;
    }
    Map<String, List<URL>> categoryNotified = notified.computeIfAbsent(url, u -> new ConcurrentHashMap<>());
    for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
        String category = entry.getKey();
        List<URL> categoryList = entry.getValue();
        categoryNotified.put(category, categoryList);
        listener.notify(categoryList);
        // We will update our cache file after each notification.
        // When our Registry has a subscribe failure due to network jitter, we can return at least the existing cache URL.
        saveProperties(url);
    }
}

消費端的listener是最開始傳遞過來的RegistryDirectory,所以這里會觸發RegistryDirectory.notify.

7)RegistryDirectory.notify

Invoker的網路連接以及后續的配置變更,都會呼叫這個notify方法

urls: zk的path資料,這里表示的是dubbo://

public synchronized void notify(List<URL> urls) {
    //對url串列進行校驗、過濾,然后分成 config、router、provider 3個分組map
    Map<String, List<URL>> categoryUrls = urls.stream()
            .filter(Objects::nonNull)
            .filter(this::isValidCategory)
            .filter(this::isNotCompatibleFor26x)
            .collect(Collectors.groupingBy(this::judgeCategory));

    List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList());
    this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators);
	// 如果router 路由節點有變化,則從新將router 下的資料生成router
    List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList());
    toRouters(routerURLs).ifPresent(this::addRouters);

    // 獲得provider URL,然后呼叫refreshOverrideAndInvoker進行重繪
    List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList());
    /**
     * 3.x added for extend URL address
     */
    ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class);
    List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null);
    if (supportedListeners != null && !supportedListeners.isEmpty()) {
        for (AddressListener addressListener : supportedListeners) {
            providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this);
        }
    }
    refreshOverrideAndInvoker(providerURLs);
}

8)refreshOverrideAndInvoker

  • 逐個呼叫注冊中心里面的配置,覆寫原來的url,組成最新的url 放入overrideDirectoryUrl 存盤
  • 根據 provider urls,重新重繪Invoker
    private void refreshOverrideAndInvoker(List<URL> urls) {
        // mock zookeeper://xxx?mock=return null
        overrideDirectoryUrl();
        refreshInvoker(urls);
    }

9)refreshInvoker

    private void refreshInvoker(List<URL> invokerUrls) {
        Assert.notNull(invokerUrls, "invokerUrls should not be null");

        if (invokerUrls.size() == 1
               //...
            //如果是空協議,則直接回傳不允許訪問
        } else {
            this.forbidden = false; // Allow to access
            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
            if (invokerUrls == Collections.<URL>emptyList()) {//如果url為空
                invokerUrls = new ArrayList<>();
            }
            if (invokerUrls.isEmpty() && this.cachedInvokerUrls != null) {
                invokerUrls.addAll(this.cachedInvokerUrls);
            } else {
                this.cachedInvokerUrls = new HashSet<>();
                this.cachedInvokerUrls.addAll(invokerUrls);//Cached invoker urls, convenient for comparison
            }
            if (invokerUrls.isEmpty()) {
                return;
            }
            //根據provider url,生成新的invoker
            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map

            /**
             * If the calculation is wrong, it is not processed.
             *
             * 1. The protocol configured by the client is inconsistent with the protocol of the server.
             *    eg: consumer protocol = dubbo, provider only has other protocol services(rest).
             * 2. The registration center is not robust and pushes illegal specification data.
             *
             */
            if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :" + invokerUrls.size() + ", invoker.size :0. urls :" + invokerUrls
                        .toString()));
                return;
            }

            List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
           
            routerChain.setInvokers(newInvokers);
             //如果服務配置了分組,則把分組下的provider包裝成StaticDirectory,組成一個invoker
            //實際上就是按照group進行合并
            this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
            this.urlInvokerMap = newUrlInvokerMap;

            try {
                //舊的url 是否在新map里面存在,不存在,就是銷毀url對應的Invoker
                destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
            } catch (Exception e) {
                logger.warn("destroyUnusedInvokers error. ", e);
            }
        }
    }

10)toInvokers

這里用到了protocol.refer來構建了一個invoker.invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url,providerUrl);

構建完成之后,會保存在 Map<String, Invoker<T>> urlInvokerMap 這個集合中,

    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
        if (urls == null || urls.isEmpty()) {
            return newUrlInvokerMap;
        }
        Set<String> keys = new HashSet<>();
        String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
        for (URL providerUrl : urls) {
            // If protocol is configured at the reference side, only the matching protocol is selected
            if (queryProtocols != null && queryProtocols.length() > 0) {
                boolean accept = false;
                String[] acceptProtocols = queryProtocols.split(",");
                for (String acceptProtocol : acceptProtocols) {
                    if (providerUrl.getProtocol().equals(acceptProtocol)) {
                        accept = true;
                        break;
                    }
                }
                if (!accept) {
                    continue;
                }
            }
            if (EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) {
                continue;
            }
            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) {
                logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() +
                        " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() +
                        " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                        ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                continue;
            }
            URL url = mergeUrl(providerUrl);

            String key = url.toFullString(); // The parameter urls are sorted
            if (keys.contains(key)) { // Repeated url
                continue;
            }
            keys.add(key);
            // Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
            Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
            Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
            if (invoker == null) { // Not in the cache, refer again
                try {
                    boolean enabled = true;
                    if (url.hasParameter(DISABLED_KEY)) {
                        enabled = !url.getParameter(DISABLED_KEY, false);
                    } else {
                        enabled = url.getParameter(ENABLED_KEY, true);
                    }
                    if (enabled) {
                        invoker = new InvokerDelegate<>(protocol.refer(serviceType, url), url, providerUrl);
                    }
                } catch (Throwable t) {
                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
                }
                if (invoker != null) { // Put new invoker in cache
                    newUrlInvokerMap.put(key, invoker);
                }
            } else {
                newUrlInvokerMap.put(key, invoker);
            }
        }
        keys.clear();
        return newUrlInvokerMap;
    }

5.protocol.refer

呼叫指定的協議來進行遠程參考,protocol是一個Protocol$Adaptive類,而真正的實作應該是:

ProtocolListenerWrapper(ProtocolFilterWrapper(QosProtocolWrapper(DubboProtocol.refer)

前面的包裝程序,在服務發布的時候已經分析過了,我們直接進入DubboProtocol.protocolBindingRefer方法

1)DubboProtocol.protocolBindingRefer

  • 優化序列化
  • 構建DubboInvoker

在構建DubboInvoker時,會構建一個ExchangeClient,通過getClients(url)方法,這里基本可以猜到到是服務的通信建立,

    public <T> Invoker<T> protocolBindingRefer(Class<T> serviceType, URL url) throws RpcException {
        optimizeSerialization(url);

        // create rpc invoker.
        DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
        invokers.add(invoker);

        return invoker;
    }

2)getClients

這里面是獲得客戶端連接的方法

  • 判斷是否為共享連接,默認是共享同一個連接進行通信
  • 是否配置了多個連接通道 connections,默認只有一個
    private ExchangeClient[] getClients(URL url) {
        // whether to share connection

        boolean useShareConnect = false;

        int connections = url.getParameter(CONNECTIONS_KEY, 0);
        List<ReferenceCountExchangeClient> shareClients = null;
        //如果沒有配置連接數,則默認為共享連接
        if (connections == 0) {
            useShareConnect = true;

            /*
             * The xml configuration should have a higher priority than properties.
             */
            String shareConnectionsStr = url.getParameter(SHARE_CONNECTIONS_KEY, (String) null);
            connections = Integer.parseInt(StringUtils.isBlank(shareConnectionsStr) ? ConfigUtils.getProperty(SHARE_CONNECTIONS_KEY,
                    DEFAULT_SHARE_CONNECTIONS) : shareConnectionsStr);
            ///
            shareClients = getSharedClient(url, connections);
        }

        ExchangeClient[] clients = new ExchangeClient[connections];
        for (int i = 0; i < clients.length; i++) {
            if (useShareConnect) {
                clients[i] = shareClients.get(i);

            } else {
                clients[i] = initClient(url);
            }
        }

        return clients;
    }

3)getSharedClient

獲得一個共享連接

    private List<ReferenceCountExchangeClient> getSharedClient(URL url, int connectNum) {
        String key = url.getAddress();
        List<ReferenceCountExchangeClient> clients = referenceClientMap.get(key);
        //檢查當前的key檢查連接是否已經創建過并且可用,如果是,則直接回傳并且增加連接的個數的統計
        if (checkClientCanUse(clients)) {
            batchClientRefIncr(clients);
            return clients;
        }
        //如果連接已經關倍訓者連接沒有創建過
        locks.putIfAbsent(key, new Object());
        synchronized (locks.get(key)) {
            clients = referenceClientMap.get(key);
            // dubbo check
            // 在創建連接之前,在做一次檢查,防止連接并發創建
            if (checkClientCanUse(clients)) {
                batchClientRefIncr(clients);
                return clients;
            }

            // connectNum must be greater than or equal to 1
            // 連接數必須大于等于1
            connectNum = Math.max(connectNum, 1);

            // If the clients is empty, then the first initialization is
            //如果當前消費者還沒有和服務端產生連接,則初始化
            if (CollectionUtils.isEmpty(clients)) {
                clients = buildReferenceCountExchangeClientList(url, connectNum);
                //創建clients之后,保存到map中
                referenceClientMap.put(key, clients);

            } else {//如果clients不為空,則從clients陣列中進行遍歷
                for (int i = 0; i < clients.size(); i++) {
                    ReferenceCountExchangeClient referenceCountExchangeClient = clients.get(i);
                    // If there is a client in the list that is no longer available, create a new one to replace him.
                    // 如果在集合中存在一個連接但是這個連接處于closed狀態,則重新構建一個進行替換
                    if (referenceCountExchangeClient == null || referenceCountExchangeClient.isClosed()) {
                        clients.set(i, buildReferenceCountExchangeClient(url));
                        continue;
                    }
					//增加個數
                    referenceCountExchangeClient.incrementAndGetCount();
                }
            }


            locks.remove(key);

            return clients;
        }
    }

4)buildReferenceCountExchangeClientList

根據連接數配置,來構建指定個數的鏈接,默認為1

    private ReferenceCountExchangeClient buildReferenceCountExchangeClient(URL url) {
        ExchangeClient exchangeClient = initClient(url);

        return new ReferenceCountExchangeClient(exchangeClient);
    }

5)initClient

進入到初始化客戶端連接的方法了,猜測應該是根據url中配置的引數進行遠程通信的構建

private ExchangeClient initClient(URL url) {

    // client type setting.
    // 獲得連接型別
    String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
	//添加默認序列化方式
    url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
    // enable heartbeat by default
    //設定心跳時間
    url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));

    // BIO is not allowed since it has severe performance issue.
    // 判斷str是否存在于擴展點中,如果不存在則直接報錯
    if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
        throw new RpcException("Unsupported client type: " + str + "," +
                " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " "));
    }

    ExchangeClient client;
    try {
        // connection should be lazy
        // 是否需要延遲創建連接,注意,這里的requestHandler是一個配接器
        if (url.getParameter(LAZY_CONNECT_KEY, false)) {
            client = new LazyConnectExchangeClient(url, requestHandler);

        } else {
            client = Exchangers.connect(url, requestHandler);
        }

    } catch (RemotingException e) {
        throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e);
    }

    return client;
}

6)Exchangers.connect

創建一個客戶端連接

public static ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    if (url == null) {
        throw new IllegalArgumentException("url == null");
    }
    if (handler == null) {
        throw new IllegalArgumentException("handler == null");
    }
    url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
    return getExchanger(url).connect(url, handler);
}

7)HeaderExchange.connect

主要關注transporters.connect

public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException {
    return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler))), true);
}

8)NettyTransport.connect

使用netty構建了一個客戶端連接

    public Client connect(URL url, ChannelHandler handler) throws RemotingException {
        return new NettyClient(url, handler);
    }

6.梳理

RegistryProtocol.refer 程序中有一個關鍵步驟,即在監聽到服務提供者url時觸發RegistryDirectory.notify() 方法,

RegistryDirectory.notify() 方法呼叫 refreshInvoker() 方法將服務提供者urls轉換為對應的 遠程invoker ,最終呼叫到 DubboProtocol.refer() 方法生成對應的 DubboInvoker ,

DubboInvoker 的構造方法中有一項入參 ExchangeClient[] clients ,即對應本文要講的網路客戶端 Client ,DubboInvoker就是通過呼叫 client.request() 方法完成網路通信的請求發送和回應接收功能,

Client 的具體生成程序就是通過 DubboProtocol 的 initClient(URL url) 方法創建了一個HeaderExchangeClient ,

在這里插入圖片描述

7.客戶端生成Proxy

1)JavassistProxyFactory.getProxy

    public <T> T getProxy(Invoker<T> invoker, Class<?>[] interfaces) {
        return (T) Proxy.getProxy(interfaces).newInstance(new InvokerInvocationHandler(invoker));
    }

這個invoker實際上是:MockClusterWrapper(FailoverCluster(directory))然后通過InvokerInvocationHandler做了一層包裝變成了InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))

2)proxy.getProxy

這個方法里面,會生成一個動態代理的方法,我們通過debug可以看到動態位元組碼的拼接程序,它代理了當前這個介面的方法 sayHello , 并且方法里面是使用handler.invoke進行呼叫的,

而handler又是這樣一個實作:InvokerInvocationHandler(MockClusterWrapper(FailoverCluster(directory)))

public java.lang.String sayHello(java.lang.String arg0){
     Object[] args = new Object[1];
     args[0] = ($w)$1;
     Object ret = handler.invoke(this, methods[0], args);
    return (java.lang.String)ret;
}

8.消費端呼叫的程序

1)nvokerInvocationHandler.invoke

這個方法主要判斷當前呼叫的遠程方法,如果是tostring、hashcode、equals,就直接回傳否則,呼叫invoker.invoke,進入到 MockClusterWrapper.invoke 方法,

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
    if (method.getDeclaringClass() == Object.class) {
        return method.invoke(invoker, args);
    }
    String methodName = method.getName();
    Class<?>[] parameterTypes = method.getParameterTypes();
    if (parameterTypes.length == 0) {
        if ("toString".equals(methodName)) {
            return invoker.toString();
        } else if ("$destroy".equals(methodName)) {
            invoker.destroy();
            return null;
        } else if ("hashCode".equals(methodName)) {
            return invoker.hashCode();
        }
    } else if (parameterTypes.length == 1 && "equals".equals(methodName)) {
        return invoker.equals(args[0]);
    }
    RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
    String serviceKey = invoker.getUrl().getServiceKey();
    rpcInvocation.setTargetServiceUniqueName(serviceKey);
  
    if (consumerModel != null) {
        rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
        rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));
    }

    return invoker.invoke(rpcInvocation).recreate();
}

2)MockClusterInvoker.invoke

  • 是否客戶端強制配置了mock呼叫,那么在這種場景中主要可以用來解決服務端還沒開發好的時候直接使用本地資料進行測驗
  • 是否出現了例外,如果出現例外則使用配置好的Mock類來實作服務的降級
public Result invoke(Invocation invocation) throws RpcException {
    Result result = null;
    //從url中獲得MOCK_KEY對應的value
    String value = getUrl().getMethodParameter(invocation.getMethodName(), MOCK_KEY, Boolean.FALSE.toString()).trim();
    //如果沒有配置mock,則直接傳遞給下個invoker呼叫
    if (value.length() == 0 || "false".equalsIgnoreCase(value)) {
        //no mock
        result = this.invoker.invoke(invocation);
    } else if (value.startsWith("force")) {//如果強制為本地呼叫,則執行mockInvoke
        if (logger.isWarnEnabled()) {
            logger.warn("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + getUrl());
        }
        //force:direct mock
        result = doMockInvoke(invocation, null);
    } else {
        //fail-mock
        try {
            result = this.invoker.invoke(invocation);

            //fix:#4585
            if(result.getException() != null && result.getException() instanceof RpcException){
                RpcException rpcException= (RpcException)result.getException();
                if(rpcException.isBiz()){
                    throw  rpcException;
                }else {
                    result = doMockInvoke(invocation, rpcException);
                }
            }

        } catch (RpcException e) {
            if (e.isBiz()) {
                throw e;
            }
			//如果遠程呼叫出現例外,則使用Mock進行處理
            if (logger.isWarnEnabled()) {
                logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + getUrl(), e);
            }
            result = doMockInvoke(invocation, e);
        }
    }
    return result;
}

3)AbstractClusterInvoker.invoke

public Result invoke(final Invocation invocation) throws RpcException {
    checkWhetherDestroyed();

    // binding attachments into invocation.
    //Dubbo中,可以通過 RpcContext 上的 setAttachment 和 getAttachment 在服務消費方和提供方之間進行引數的隱式傳遞
    Map<String, Object> contextAttachments = RpcContext.getContext().getObjectAttachments();
    if (contextAttachments != null && contextAttachments.size() != 0) {
        ((RpcInvocation) invocation).addObjectAttachments(contextAttachments);
    }
	//通過list獲得invoker串列,這個串列基本可以猜測到是從directory里面獲得的、但是這里面還實作了服務路由的邏輯,
    //簡單來說就是先拿到invoker串列,然后通過router進行服務路由,篩選出符合路由規則的服務提供者
    List<Invoker<T>> invokers = list(invocation);
    //初始化負載均衡機制
    LoadBalance loadbalance = initLoadBalance(invokers, invocation);
    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);
    return doInvoke(invocation, invokers, loadbalance);
}

4)initLoadBalance

初始化負載均衡

protected LoadBalance initLoadBalance(List<Invoker<T>> invokers, Invocation invocation) {
    if (CollectionUtils.isNotEmpty(invokers)) {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(invokers.get(0).getUrl()
                .getMethodParameter(RpcUtils.getMethodName(invocation), LOADBALANCE_KEY, DEFAULT_LOADBALANCE));
    } else {
        return ExtensionLoader.getExtensionLoader(LoadBalance.class).getExtension(DEFAULT_LOADBALANCE);
    }
}

5)FailoverClusterInvoker.doInvoke

failover是失敗重試,所以這里面應該會實作容錯的邏輯

  • 獲得重試的次數,并且進行回圈
  • 獲得目標服務,并且記錄當前已經呼叫過的目標服務防止下次繼續將請求發送過去
  • 如果執行成功,則回傳結果
  • 如果出現例外,判斷是否為業務例外,如果是則拋出,否則,進行下一次重試

在這里插入圖片描述

  • 這里的 Invoker 是 Provider 的一個可呼叫 Service 的抽象, Invoker 封裝了 Provider 地址及 Service 介面資訊
  • Directory 代表多個 Invoker ,可以把它看成 List<Invoker> ,但與 List 不同的是,它的值可能是動態變化的,比如注冊中心推送變更
  • Cluster 將 Directory 中的多個 Invoker 偽裝成一個 Invoker ,對上層透明,偽裝程序包含了容錯邏輯,呼叫失敗后,重試另一個
  • Router 負責從多個 Invoker 中按路由規則選出子集,比如讀寫分離,應用隔離等
  • LoadBalance 負責從多個 Invoker 中選出具體的一個用于本次呼叫,選的程序包含了負載均衡演算法,呼叫失敗后,需要重選
public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException {
    List<Invoker<T>> copyInvokers = invokers;
    checkInvokers(copyInvokers, invocation);
    String methodName = RpcUtils.getMethodName(invocation);
    int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1;
    if (len <= 0) {
        len = 1;
    }
    // retry loop.
    RpcException le = null; // last exception.
    List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers.
    Set<String> providers = new HashSet<String>(len);
    for (int i = 0; i < len; i++) {
        //Reselect before retry to avoid a change of candidate `invokers`.
        //NOTE: if `invokers` changed, then `invoked` also lose accuracy.
        if (i > 0) {
            checkWhetherDestroyed();
            copyInvokers = list(invocation);
            // check again
            checkInvokers(copyInvokers, invocation);
        }
        //通過負載均衡獲得目標invoker
        Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked);
        invoked.add(invoker);//記錄已經呼叫過的服務,下次呼叫會進行過濾
        RpcContext.getContext().setInvokers((List) invoked);
        try {
            //服務呼叫成功,直接回傳結果
            Result result = invoker.invoke(invocation);
            if (le != null && logger.isWarnEnabled()) {
                
            }
            return result;
        } catch (RpcException e) {
            if (e.isBiz()) { // 如果是業務例外,直接拋出不進行重試
                throw e;
            }
            le = e;//記錄例外資訊,進行下一次回圈
        } catch (Throwable e) {
            le = new RpcException(e.getMessage(), e);
        } finally {
            providers.add(invoker.getUrl().getAddress());
        }
    }
    throw new RpcException(le.getCode());
}

9.負載均衡

1)select

所有負載均衡實作類均繼承自 AbstractLoadBalance,該類實作了 LoadBalance 介面,并封裝了一些公共的邏輯,所以在分析負載均衡實作之前,先來看一下 AbstractLoadBalance 的邏輯,首先來看一下負載均衡的入口方法 select,如下:

protected Invoker<T> select(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    String methodName = invocation == null ? StringUtils.EMPTY_STRING : invocation.getMethodName();

    boolean sticky = invokers.get(0).getUrl()
            .getMethodParameter(methodName, CLUSTER_STICKY_KEY, DEFAULT_CLUSTER_STICKY);

    //ignore overloaded method
    if (stickyInvoker != null && !invokers.contains(stickyInvoker)) {
        stickyInvoker = null;
    }
    //ignore concurrency problem
    if (sticky && stickyInvoker != null && (selected == null || !selected.contains(stickyInvoker))) {
        if (availablecheck && stickyInvoker.isAvailable()) {
            return stickyInvoker;
        }
    }
    //呼叫 doSelect 方法進行負載均衡,該方法為抽象方法,由子類實作
    Invoker<T> invoker = doSelect(loadbalance, invocation, invokers, selected);

    if (sticky) {
        stickyInvoker = invoker;
    }
    return invoker;
}

2)doSelect

private Invoker<T> doSelect(LoadBalance loadbalance, Invocation invocation,
                            List<Invoker<T>> invokers, List<Invoker<T>> selected) throws RpcException {

    if (CollectionUtils.isEmpty(invokers)) {
        return null;
    }
    //如果就一個,就不負載均衡了,直接回傳
    if (invokers.size() == 1) {
        return invokers.get(0);
    }
    //負載均衡邏輯
    Invoker<T> invoker = loadbalance.select(invokers, getUrl(), invocation);

    //If the `invoker` is in the  `selected` or invoker is unavailable && availablecheck is true, reselect.
    if ((selected != null && selected.contains(invoker))
            || (!invoker.isAvailable() && getUrl() != null && availablecheck)) {
        try {
            Invoker<T> rInvoker = reselect(loadbalance, invocation, invokers, selected, availablecheck);
            if (rInvoker != null) {
                invoker = rInvoker;
            } else {
                
                int index = invokers.indexOf(invoker);
                try {
                   
                    invoker = invokers.get((index + 1) % invokers.size());
                } catch (Exception e) {
                    
            }
        } catch (Throwable t) {
            
        }
    }
    return invoker;
}

3)select

@SPI(RandomLoadBalance.NAME)
public interface LoadBalance {

    @Adaptive("loadbalance")
    <T> Invoker<T> select(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException;

}

這里應該選擇的RandomLoadBalance,默認情況下,

4)RandomLoadBalance.doSelect

protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation) {
    // Number of invokers
    int length = invokers.size();
    // Every invoker has the same weight?
    boolean sameWeight = true;
    // the weight of every invokers
    int[] weights = new int[length];
    // the first invoker's weight
    int firstWeight = getWeight(invokers.get(0), invocation);
    weights[0] = firstWeight;
    // The sum of weights
    int totalWeight = firstWeight;
    for (int i = 1; i < length; i++) {
        int weight = getWeight(invokers.get(i), invocation);
        // save for later use
        weights[i] = weight;
        // Sum
        totalWeight += weight;
        if (sameWeight && weight != firstWeight) {
            sameWeight = false;
        }
    }
    if (totalWeight > 0 && !sameWeight) {
        // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight.
        int offset = ThreadLocalRandom.current().nextInt(totalWeight);
        // Return a invoker based on the random value.
        for (int i = 0; i < length; i++) {
            offset -= weights[i];
            if (offset < 0) {
                return invokers.get(i);
            }
        }
    }
    // If all invokers have the same weight value or totalWeight=0, return evenly.
    return invokers.get(ThreadLocalRandom.current().nextInt(length));
}

10.DubboInvoker

1)AbstractInvoker.invoke

這里面也是對Invocation的attachments進行處理,把attachment加入到Invocation中,

這里的attachment,實際上是目標服務的介面資訊以及版本資訊,

2)DubboInvoker.doInvoker

這里面看到一個很熟悉的東西,就是ExchangeClient,這個是客戶端和服務端之間的連接,

然后如果當前方法有回傳值,也就是isOneway=false,則執行else邏輯,然后通過異步的形式進行通信,

    protected Result doInvoke(final Invocation invocation) throws Throwable {
        RpcInvocation inv = (RpcInvocation) invocation;
        final String methodName = RpcUtils.getMethodName(invocation);
        inv.setAttachment(PATH_KEY, getUrl().getPath());
        inv.setAttachment(VERSION_KEY, version);

        ExchangeClient currentClient;
        if (clients.length == 1) {
            currentClient = clients[0];
        } else {
            currentClient = clients[index.getAndIncrement() % clients.length];
        }
        try {
            boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);
            int timeout = calculateTimeout(invocation, methodName);
            if (isOneway) {
                boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);
                currentClient.send(inv, isSent);
                return AsyncRpcResult.newDefaultAsyncResult(invocation);
            } else {
                ExecutorService executor = getCallbackExecutor(getUrl(), inv);
                CompletableFuture<AppResponse> appResponseFuture =
                        currentClient.request(inv, timeout, executor).thenApply(obj -> (AppResponse) obj);
                // save for 2.6.x compatibility, for example, TraceFilter in Zipkin uses com.alibaba.xxx.FutureAdapter
                FutureContext.getContext().setCompatibleFuture(appResponseFuture);
                AsyncRpcResult result = new AsyncRpcResult(appResponseFuture, inv);
                result.setExecutor(executor);
                return result;
            }
        } catch (TimeoutException e) {
            throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        } catch (RemotingException e) {
            throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);
        }
    }

3)currentClient.request

它實際是一個ReferenceCountExchangeClient(HeaderExchangeClient())

所以它的呼叫鏈路是:

ReferenceCountExchangeClient->HeaderExchangeClient->HeaderExchangeChannel->(request方法)

最終,把構建好的 RpcInvocation,組裝到一個Request物件中進行傳遞

    public CompletableFuture<Object> request(Object request, int timeout, ExecutorService executor) throws RemotingException {
        if (closed) {
            throw new RemotingException(this.getLocalAddress(), null, "Failed to send request " + request + ", cause: The channel " + this + " is closed!");
        }
        // create request.
        Request req = new Request();
        req.setVersion(Version.getProtocolVersion());
        req.setTwoWay(true);
        req.setData(request);
        DefaultFuture future = DefaultFuture.newFuture(channel, req, timeout, executor);
        try {
            channel.send(req);
        } catch (RemotingException e) {
            future.cancel();
            throw e;
        }
        return future;
    }

channel.send的呼叫鏈路:

AbstractPeer.send ->AbstractClient.send->NettyChannel.send

通過NioSocketChannel把訊息發送出去

ChannelFuture future = channel.writeAndFlush(message);

十五,服務端接收訊息的處理流程

客戶端把訊息發送出去之后,服務端會收到訊息,然后把執行的結果回傳到客戶端

1.服務端接收到訊息

服務端這邊接收訊息的處理鏈路,也比較復雜,我們回到NettServer中創建io的程序.

bootstrap.group(bossGroup, workerGroup)
       .channel(NioServerSocketChannel.class)
       .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
       .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
       .childOption(ChannelOption.ALLOCATOR,
PooledByteBufAllocator.DEFAULT)
       .childHandler(new ChannelInitializer<NioSocketChannel>() {
          @Override
          protected void initChannel(NioSocketChannel ch) throws
Exception {
            // FIXME: should we use getTimeout()?
            int idleTimeout = UrlUtils.getIdleTimeout(getUrl());
            NettyCodecAdapter adapter = new
NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
            ch.pipeline()//.addLast("logging",new
LoggingHandler(LogLevel.INFO))//for debug
               .addLast("decoder", adapter.getDecoder())
               .addLast("encoder", adapter.getEncoder())
               .addLast("server-idle-handler", new
IdleStateHandler(0, 0, idleTimeout, MILLISECONDS))
               .addLast("handler", nettyServerHandler);
         }
       });

handler配置的是nettyServerHandler

server-idle-handler 表示心跳處理的機制

final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(),this);

Handler與Servlet中的filter很像,通過Handler可以完成通訊報文的解碼編碼、攔截指定的報文、統一對日志錯誤進行處理、統一對請求進行計數、控制Handler執行與否,

2.handler.channelRead()

服務端收到讀的請求是,會進入這個方法,

接著通過handler.received來處理msg,這個handle的鏈路很長,比較復雜,我們需要逐步剖析

@Override
  public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    
      NettyChannel channel = NettyChannel.getOrAddChannel(ctx.channel(), url,handler);
    try {
      handler.received(channel, msg);
   } finally {
      NettyChannel.removeChannelIfDisconnected(ctx.channel());
   }
 }

handler->MultiMessageHandler->HeartbeatHandler->AllChannelHandler->DecodeHandler-HeaderExchangeHandler->最后進入這個方法->DubboProtocol$requestHandler(receive)

MultiMessageHandler: 復合訊息處理

HeartbeatHandler:心跳訊息處理,接收心跳并發送心跳回應

AllChannelHandler:業務執行緒轉化處理器,把接收到的訊息封裝成ChannelEventRunnable可執行任務給執行緒池處理

DecodeHandler:業務解碼處理器

3.HeaderExchangeHandler.received

互動層請求回應處理,有三種處理方式

  1. handlerRequest,雙向請求
  2. handler.received 單向請求
  3. handleResponse 回應訊息
public void received(Channel channel, Object message) throws RemotingException {
   
    channel.setAttribute(KEY_READ_TIMESTAMP, System.currentTimeMillis());
    final ExchangeChannel exchangeChannel =HeaderExchangeChannel.getOrAddChannel(channel);
    try {
      if (message instanceof Request) {
        // handle request.
        Request request = (Request) message;
        if (request.isEvent()) {
          handlerEvent(channel, request);
       } else {
          if (request.isTwoWay()) {
            handleRequest(exchangeChannel, request);
         } else {
            handler.received(exchangeChannel, request.getData());
         }
       }
     } else if (message instanceof Response) {
        handleResponse(channel, (Response) message);
     } else if (message instanceof String) {
        if (isClientSide(channel)) {
          Exception e = new Exception("Dubbo client can not supported string message: " + message + " in channel: " + channel + ", url: " + channel.getUrl());
          logger.error(e.getMessage(), e);
       } else {
          String echo = handler.telnet(channel, (String) message);
          if (echo != null && echo.length() > 0) {
            channel.send(echo);
         }
       }
     } else {
        handler.received(exchangeChannel, message);
     }
   } finally {
      HeaderExchangeChannel.removeChannelIfDisconnected(channel);
   }
 }

4.ExchangeHandler.reply

接著進入到ExchangeHandler.reply這個方法中

  • 把message轉化為Invocation
  • 呼叫getInvoker獲得一個Invoker物件
  • 然后通過 Result result = invoker.invoke(inv); 進行呼叫
public CompletableFuture<Object> reply(ExchangeChannel channel, Object message) throws RemotingException {
            if (!(message instanceof Invocation)) {
                throw new RemotingException(channel, "Unsupported request: " + (message == null ? null : message.getClass().getName() + ": " + message) + ", channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress());
            } else {
                Invocation inv = (Invocation)message;
                Invoker<?> invoker = DubboProtocol.this.getInvoker(channel, inv);
                if (Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("_isCallBackServiceInvoke"))) {
                    String methodsStr = (String)invoker.getUrl().getParameters().get("methods");
                    boolean hasMethod = false;
                    if (methodsStr != null && methodsStr.contains(",")) {
                        String[] methods = methodsStr.split(",");
                        String[] var8 = methods;
                        int var9 = methods.length;

                        for(int var10 = 0; var10 < var9; ++var10) {
                            String method = var8[var10];
                            if (inv.getMethodName().equals(method)) {
                                hasMethod = true;
                                break;
                            }
                        }
                    } else {
                        hasMethod = inv.getMethodName().equals(methodsStr);
                    }

                    if (!hasMethod) {
                        DubboProtocol.this.logger.warn(new IllegalStateException("The methodName " + inv.getMethodName() + " not found in callback service interface ,invoke will be ignored. please update the api interface. url is:" + invoker.getUrl()) + " ,invocation is :" + inv);
                        return null;
                    }
                }

                RpcContext.getContext().setRemoteAddress(channel.getRemoteAddress());
                Result result = invoker.invoke(inv);
                return result.thenApply(Function.identity());
            }
        }

5.getInvoker

這里面是獲得一個invoker的實作DubboExporter<?> exporter = (DubboExporter<?>) exporterMap.get(serviceKey);

這段代碼非常熟悉,exporterMap不就是我們之前在分析服務發布的程序中,保存的Invoker嗎?而key,就是對應的interface:port

    Invoker<?> getInvoker(Channel channel, Invocation inv) throws RemotingException {
        boolean isCallBackServiceInvoke = false;
        boolean isStubServiceInvoke = false;
        int port = channel.getLocalAddress().getPort();
        String path = (String)inv.getObjectAttachments().get("path");
        isStubServiceInvoke = Boolean.TRUE.toString().equals(inv.getObjectAttachments().get("dubbo.stub.event"));
        if (isStubServiceInvoke) {
            port = channel.getRemoteAddress().getPort();
        }

        isCallBackServiceInvoke = this.isClientSide(channel) && !isStubServiceInvoke;
        if (isCallBackServiceInvoke) {
            path = path + "." + inv.getObjectAttachments().get("callback.service.instid");
            inv.getObjectAttachments().put("_isCallBackServiceInvoke", Boolean.TRUE.toString());
        }

        String serviceKey = serviceKey(port, path, (String)inv.getObjectAttachments().get("version"), (String)inv.getObjectAttachments().get("group"));
        DubboExporter<?> exporter = (DubboExporter)this.exporterMap.get(serviceKey);
        if (exporter == null) {
            throw new RemotingException(channel, "Not found exported service: " + serviceKey + " in " + this.exporterMap.keySet() + ", may be version or group mismatch , channel: consumer: " + channel.getRemoteAddress() + " --> provider: " + channel.getLocalAddress() + ", message:" + this.getInvocationWithoutData(inv));
        } else {
            return exporter.getInvoker();
        }
    }

6.exporterMap

Map<String, Exporter<?>> exporterMap = new ConcurrentHashMap<String, Exporter<?>>();

在服務發布時,實際上是把invoker包裝成了DubboExpoter,然后放入到exporterMap中,

    public <T> Exporter<T> export(Invoker<T> invoker) throws RpcException {
        URL url = invoker.getUrl();
        String key = serviceKey(url);
        DubboExporter<T> exporter = new DubboExporter(invoker, key, this.exporterMap);
        this.exporterMap.put(key, exporter);
        Boolean isStubSupportEvent = url.getParameter("dubbo.stub.event", false);
        Boolean isCallbackservice = url.getParameter("is_callback_service", false);
        if (isStubSupportEvent && !isCallbackservice) {
            String stubServiceMethods = url.getParameter("dubbo.stub.event.methods");
            if ((stubServiceMethods == null || stubServiceMethods.length() == 0) && this.logger.isWarnEnabled()) {
                this.logger.warn(new IllegalStateException("consumer [" + url.getParameter("interface") + "], has set stubproxy support event ,but no stub methods founded."));
            }
        }

        this.openServer(url);
        this.optimizeSerialization(url);
        return exporter;
    }

7.invoker.invoke(inv)

接著呼叫invoker.invoke

此時的invoker是一個什么呢?

invoker=ProtocolFilterWrapper(InvokerDelegate(DelegateProviderMetaDataInvoker(AbstractProxyInvoker)))

最后一定會進入到這個代碼里面

8.AbstractProxyInvoker

在AbstractProxyInvoker里面,doInvoker本質上呼叫的是wrapper.invokeMethod()

return new AbstractProxyInvoker<T>(proxy, type, url) {
      @Override
      protected Object doInvoke(T proxy, String methodName,
                   Class<?>[] parameterTypes,
                   Object[] arguments) throws Throwable {
        return wrapper.invokeMethod(proxy, methodName, parameterTypes,arguments);
     }
   };

而Wrapper是一個動態代理類,它的定義是這樣的, 最終呼叫w.sayHello()方法進行處理

到此為止,服務端的處理程序就分析完了,

在這里插入圖片描述

十六,性能調優相關引數

1.常用的性能調優引數

引數名作用范圍說明默認值備注
threadsprovider業務處理執行緒池大小200
iothreadsproviderio執行緒池大小cpu個數+1
queuesprovider執行緒池佇列大小,當執行緒池滿時,排隊等待執行的佇列大小,建議不要設定,當執行緒池時應立即失敗,重試其他服務提供機器,而不是排隊,除非有特殊需求,0
connectionsconsumer對每個提供者的最大連接數,rmi,http,hession等短連接協議表示限制連接數,Dubbo等長連接表示簡歷的長連接個數0Dubbo協議默認共享一個長連接
activesconsumer每個服務消費者每個方法的對大并發呼叫數00表示不限制
acceptsprovider服務提供方最大可接收連接數00表示不限制
executesprovider服務提供者每個服務每個方法最大可并行執行請求數00表示不限制

2.各個引數的作用

在這里插入圖片描述

  1. 當consumer發起一個請求時,首先經過active limit(引數actives)進行方法級別的限制,其實作方式為CHM中存放計數器(AtomicInteger),請求時加1,請求完成(包括例外)減1,如果超過actives則等待有其他請求完成后重試或者超時后失敗;
  2. 從多個連接(connections)中選擇一個連接發送資料,對于默認的netty實作來說,由于可以復用連接,默認一個連接就可以,不過如果你在壓測,且只有一個consumer,一個provider,此時適當的加大connections確實能夠增強網路傳輸能力,但線上業務由于有多個consumer多個provider,因此不建議增加connections引數;
  3. 連接到達provider時(如dubbo的初次連接),首先會判斷總連接數是否超限(acceps),超過限制連接將被拒絕;
  4. 連接成功后,具體的請求交給io thread處理,io threads雖然是處理資料的讀寫,但io部分為異步,更多的消耗的是cpu,因此iothreads默認cpu個數+1是比較合理的設定,不建議調整此引數;
  5. 資料讀取并反序列化以后,交給業務執行緒池處理,默認情況下執行緒池為fixed,且排隊佇列為0(queues),這種情況下,最大并發等于業務執行緒池大小(threads),如果希望有請求的堆積能力,可以調整queues引數,如果希望快速失敗由其他節點處理(官方推薦方式),則不修改queues,只調整threads;
  6. execute limit(引數executes)是方法級別的并發限制,原理與actives類似,只是少了等待的程序,即受限后立即失敗

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/274714.html

標籤:java

上一篇:設計模式-代理模式(靜態代理、動態代理、cglib代理)

下一篇:JVM面經

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more