目錄
- 前言
- 1. 客戶端注冊進 Nacos 注冊中心(客戶端視角)
- 1.1 Spring Cloud 提供的規范標準
- 1.2 Nacos 的自動配置類
- 1.3 監聽服務初始化事件 AbstractAutoServiceRegistration.bind()
- 1.4 注冊服務實體的邏輯 NacosServiceRegistry.register()
- 1.4.1 心跳機制 BeatReactor.addBeatInfo()
- 1.4.2 注冊服務 NamingProxy.registerService()
- 1.5 以 Open API 方式發送注冊請求
- 1.6 小結
- 2. Nacos 服務器注冊服務(服務器視角)
- 2.1 服務器接收請求 InstanceController.register()
- 2.2 在服務器控制臺注冊服務實體 ServiceManager.registerInstance()
- 2.3 創建空服務 ServiceManager.createEmptyService()
- 2.3.1 將服務添加到快取 Service.putService()
- 2.3.2 建立心跳機制 Service.init()
- 2.3.3 實作資料一致性的監聽 DelegateConsistencyServiceImpl.listen()
- 2.4 小結
- 3. 客戶端查詢所有服務實體
- 3.1 消費者客戶端向 Nacos 發出請求
- 3.2 Nacos 服務器處理請求 InstanceController.list()
- 3.2 獲取所有服務的所有資訊 InstanceController.doSrvIpxt()
- 4. 客戶端監聽 Nacos 服務器以動態獲取服務實體
- 4.1 客戶端發送請求
- 4.2 服務動態感知的原理
- 4.3 Nacos 服務器處理請求
- 5. 補充內容
- 5.1 Dubbo 的自動裝配
- 6. 原始碼結構圖總結
- 6.1 客戶端視角下的服務注冊結構圖
- 6.2 服務器視角下的服務注冊結構圖
- 6.3 客戶端查詢所有服務實體結構圖
- 最后
前言
參考資料:
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服務原理與實戰》
《B站 尚硅谷 SpringCloud 框架開發教程 周陽》
為方便理解與表達,這里把 Nacos 控制臺和 Nacos 注冊中心稱為 Nacos 服務器(就是 web 界面那個),我們撰寫的業務服務稱為 Nacso 客戶端;
Nacos 客戶端將自己注冊進 Nacos 服務器,《1. 服務如何注冊進 Nacos 注冊中心》主要從 Nacos 客戶端角度解釋如何發送資訊給 Nacos 服務器;《2. Nacos 服務器注冊服務》主要從 Nacos 服務器角度解釋注冊原理;
《3. 客戶端查詢所有服務實體》將從服務消費者和提供者的角度,解釋服務消費者如何獲取提供者的所有實體,服務消費者和提供者都是 Nacos 的客戶端;
《4. 客戶端監聽 Nacos 服務器以動態獲取服務實體》從消費者客戶端角度出發監聽 Nacos 服務器,以動態獲知提供者的變化;
1. 客戶端注冊進 Nacos 注冊中心(客戶端視角)
1.1 Spring Cloud 提供的規范標準
- 服務要注冊進 Spring Cloud 集成的 Nacos,首先要滿足 Spring Cloud 提供的規范標準;
- 在 spring-cloud-common 包中有一個類 org.springframework.cloud.client.serviceregistry.ServiceRegistry,它是Spring Cloud 提供的服務注冊的標準,集成到 Spring Cloud 中實作服務注冊的組件,都會實作該介面;
public interface ServiceRegistry<R extends Registration>{
void register(R registration);
void deregister(R registration);
void close();
void setStatus(R registration,String status);
<T> T getstatus(R registration);
}
1.2 Nacos 的自動配置類
- 在 spring-cloud-commons 包的
META-INF/spring.factories中包含自動裝配的配置資訊,即約定 Spring Cloud 啟動時,會將那些類自動注入到容器中:

- 其中 AutoServiceRegistrationAutoConfiguration(服務注冊自動配置類) 是服務注冊相關配置類,原始碼如下:
@Configuration
@Import({AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(
value = https://www.cnblogs.com/dlhjw/p/{"spring.cloud.service-registry.auto-registration.enabled"},
matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
//自動注冊類
@Autowired(required = false)
private AutoServiceRegistration autoServiceRegistration;
//自動注冊類的組態檔
@Autowired
private AutoServiceRegistrationProperties properties;
public AutoServiceRegistrationAutoConfiguration() {
}
//初始化函式
@PostConstruct
protected void init() {
if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
}
}
}
- 其中關鍵之處在于注入了一個 AutoServiceRegistration(服務注冊器) 自動注冊介面,該介面在 Spring Cloud 中有一個抽象實作類 AbstractAutoServiceRegistration(服務注冊器抽象類);
- 我們要用什么注冊中心,該注冊中心就要繼承 AbstractAutoServiceRegistration(服務注冊器抽象類) 抽象類;
- 對于 Nacos 來說,使用 NacosAutoServiceRegistration(Nacos 服務注冊器) 類繼承 AbstractAutoServiceRegistration(服務注冊器抽象類) 抽象類;

1.3 監聽服務初始化事件 AbstractAutoServiceRegistration.bind()
- 在上述中需要關注 ApplicationListener(事件監聽器) 介面,它是一種事件監聽機制,介面宣告如下:
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
void onApplicationEvent(E var1);
}
- AbstractAutoServiceRegistration(服務注冊器抽象類) 使用
AbstractAutoServiceRegistration.bind()方法實作了該介面,用來監聽 WebServerInitializedEvent(服務初始化事件);
@Override
@SuppressWarnings("deprecation")
//【斷點步入】
public void onApplicationEvent(WebServerInitializedEvent event) {
bind(event);
}
- 我們給
AbstractAutoServiceRegistration.bind()方法打上斷點,啟動服務提供者,可以發現:- Nacos 服務器上沒有服務注冊實體;
- 服務初始化已經完成;

- 【結論】說明服務注冊到 Nacos 的流程是先進行服務初始化,然后通過事件監聽機制監聽初始化事件,當初始化完成時,呼叫
AbstractAutoServiceRegistration.bind()方法將服務注冊進 Nacos 注冊中心;
1.4 注冊服務實體的邏輯 NacosServiceRegistry.register()
這里能說明什么時候服務會將自己的資訊發給 Nacos 服務器;
- 我們追進
AbstractAutoServiceRegistration.bind()方法,發現在 AbstractAutoServiceRegistration(服務注冊器抽象類) 里呼叫NacosServiceRegistry.register()方法后, Nacos 服務器上出現服務實體;

- 我們追進
NacosServiceRegistry.register()方法,方法的邏輯如下:
@Override
public void register(Registration registration) {
//判斷是否有服務 ID
if (StringUtils.isEmpty(registration.getServiceId())) {
log.warn("No service to register for nacos client...");
return;
}
String serviceId = registration.getServiceId(); //服務 ID(service-provider)
Instance instance = getNacosInstanceFromRegistration(registration); //服務實體(里面有 ip、port 等資訊)
try {
//【斷點步入】注冊的方法
namingService.registerInstance(serviceId, instance);
log.info("nacos registry, {} {}:{} register finished", serviceId,
instance.getIp(), instance.getPort());
}
catch (Exception e) {
log.error("nacos registry, {} register failed...{},", serviceId,
registration.toString(), e);
}
}

- 我們點進去
namingService.registerInstance()方法(實作邏輯在 NacosNamingService.registerInstance())得到注冊的具體邏輯,如下:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
if (instance.isEphemeral()) {
//用心跳 BeatInfo 封裝服務實體資訊
BeatInfo beatInfo = new BeatInfo();
beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
beatInfo.setIp(instance.getIp());
beatInfo.setPort(instance.getPort());
beatInfo.setCluster(instance.getClusterName());
beatInfo.setWeight(instance.getWeight());
beatInfo.setMetadata(instance.getMetadata());
beatInfo.setScheduled(false);
long instanceInterval = instance.getInstanceHeartBeatInterval();
beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
//【斷點步入 1.4.1】將 beatInfo 心跳資訊放進 beatReactor 心跳發送器(發送心跳后,Nacos 服務器仍然沒有服務實體)
this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
}
//【斷點步入 1.4.2】使用 namingProxy 命名代理方式將服務實體資訊以 POST 請求方式發送服務實體
this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
-
即注冊服務實體的邏輯分兩步:
- 先通過
beatReactor.addBeatInfo創建心跳資訊實作健康檢測; - 再以 POST 請求方式發送服務實體資訊;
- 先通過
1.4.1 心跳機制 BeatReactor.addBeatInfo()
- 我們進入
BeatReactor.addBeatInfo()方法一探心跳機制,原始碼如下:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
//【核心】定時向服務端發送一個心跳包 beatInfo
this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
//【核心】
MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
-
BeatReactor.addBeatInfo()方法主要做了兩件事:- 客戶端呼叫
ScheduledExecutorService.schedule()介面方法(靠 ScheduledThreadPoolExecutor(計劃執行緒執行器) 實作)執行定時任務,在每個任務周期內定時向服務端發送一個心跳包 beatInfo; - 然后通過
MetricsMonitor.getDom2BeatSizeMonitor()方法獲取一個 心跳測量監視器(實際為 Gauge),不斷檢測服務端的回應,如果在設定時間內沒有收到服務端的回應,則認為服務器出現了故障;
- 客戶端呼叫
-
Nacos 服務端會根據客戶端的心跳包不斷更新服務的狀態;
1.4.2 注冊服務 NamingProxy.registerService()
- 接著進入
NamingProxy.registerService()方法,原始碼如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
Map<String, String> params = new HashMap(9);
params.put("namespaceId", this.namespaceId);
params.put("serviceName", serviceName);
params.put("groupName", groupName);
params.put("clusterName", instance.getClusterName());
params.put("ip", instance.getIp());
params.put("port", String.valueOf(instance.getPort()));
params.put("weight", String.valueOf(instance.getWeight()));
params.put("enable", String.valueOf(instance.isEnabled()));
params.put("healthy", String.valueOf(instance.isHealthy()));
params.put("ephemeral", String.valueOf(instance.isEphemeral()));
params.put("metadata", JSON.toJSONString(instance.getMetadata()));
//【斷點步入】這步執行完后,Nacos 服務器才出現服務實體資訊
this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}
- 該方法先對服務實體做了封裝,然后通過
NamingProxy.reqAPI()方法拼湊注冊服務的 API,NamingProxy.reqAPI()方法原始碼如下:
public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
params.put("namespaceId", this.getNamespaceId());
if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
throw new IllegalArgumentException("no server available");
} else {
Exception exception = new Exception();
if (servers != null && !servers.isEmpty()) {
Random random = new Random(System.currentTimeMillis());
int index = random.nextInt(servers.size());
for(int i = 0; i < servers.size(); ++i) {
String server = (String)servers.get(index);
try {
return this.callServer(api, params, server, method);
} catch (NacosException var11) {
exception = var11;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
} catch (Exception var12) {
exception = var12;
LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
}
index = (index + 1) % servers.size();
}
throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
} else {
int i = 0;
while(i < 3) {
try {
return this.callServer(api, params, this.nacosDomain);
} catch (Exception var13) {
exception = var13;
LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
++i;
}
}
throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
}
}
}
1.5 以 Open API 方式發送注冊請求
- 最終將以 Open API 方式發送如下請求給 Nacos 服務器:
POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'

1.6 小結
- Nacos 在先加載組態檔初始化服務,使用 ApplicationListener(事件監聽器) 監聽機制監聽該服務初始化事件,當服務初始化完成后,進入
NacosServiceRegistry.register()注冊邏輯; - 注冊原理分兩步:
- 先使用 NacosNamingService(Nacos 命名服務) 發送心跳;
- 再使用 NamingProxy(命名代理),以 POST 請求方式發送服務實體給 Nacos 服務器;
- 即 Nacos Service 必須要確保注冊的服務實體是健康的(心跳檢測),才能進行服務注冊;
2. Nacos 服務器注冊服務(服務器視角)
- Nacos 服務器的原始碼下載、啟動詳情請見《微服務架構 | 3.2 Alibaba Nacos 注冊中心》;
- 上述提到客戶端注冊服務器的最后一步是向服務器發送如下 Open API 請求:
POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'- 那么服務器的原始碼分析將從這個請求開始;
2.1 服務器接收請求 InstanceController.register()
- 服務器在 nacos-naming 模塊下 InstanceController(服務實體控制器) 里定義了一個
register方法用來處理客戶端的注冊,原始碼如下:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {
//省略其他代碼
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
//從請求體里決議出 namespaceId 命名空間(本例中是 public)
final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//決議 serviceName 服務名(本例中是 DEFAULT_GROUP@@service-provider,實際就是service-provider)
final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
final Instance instance = parseInstance(request);
//【斷點步入】在服務器控制臺注冊服務實體的方法
serviceManager.registerInstance(namespaceId, serviceName, instance);
return "ok";
}
//省略其他代碼
}

2.2 在服務器控制臺注冊服務實體 ServiceManager.registerInstance()
ServiceManager.registerInstance()方法的原始碼如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
//【斷點步入 2.3】創建空服務
createEmptyService(namespaceId, serviceName, instance.isEphemeral());
Service service = getService(namespaceId, serviceName);
if (service == null) {
throw new NacosException(NacosException.INVALID_PARAM,
"service not found, namespace: " + namespaceId + ", service: " + serviceName);
}
//添加服務實體
addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
- 這段代碼主要做了三個邏輯的內容:
createEmptyService():創建空服務(用于在 Nacos 服務器控制臺的“服務串列”中展示服務資訊),實際上是初始化一個 serviceMap,它是一個 ConcurrentHashMap 集合;getService():從 serviceMap 中根據 namespaceld 和 serviceName 得到一個服務物件;addInstance():把當前注冊的服務實體保存到 Service 中;
2.3 創建空服務 ServiceManager.createEmptyService()
- 我們一直追進去,發現最后呼叫的是
ServiceManager.createServiceIfAbsent()方法,原始碼如下:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
//通過命名空間 ID 和服務名從快取中獲取 service 服務,第一次是沒有的,進入 if 陳述句創建 service
Service service = getService(namespaceId, serviceName);
if (service == null) {
Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
service = new Service();
service.setName(serviceName);
service.setNamespaceId(namespaceId);
service.setGroupName(NamingUtils.getGroupName(serviceName));
// now validate the service. if failed, exception will be thrown
service.setLastModifiedMillis(System.currentTimeMillis());
service.recalculateChecksum();
if (cluster != null) {
cluster.setService(service);
service.getClusterMap().put(cluster.getName(), cluster);
}
//判斷服務是否有效,不生效將拋出例外
service.validate();
//【斷點步入】添加與初始化服務
putServiceAndInit(service);
if (!local) {
addOrReplaceService(service);
}
}
}
- 第一次注冊服務時快取中肯定是沒有的,我們進入到 if 陳述句里的
ServiceManager.putServiceAndInit()方法;
private void putServiceAndInit(Service service) throws NacosException {
//【斷點步入 2.3.1 】將服務添加到快取
putService(service);
service = getService(service.getNamespaceId(), service.getName());
//【斷點步入 2.3.2 】建立心跳機制
service.init();
//【斷點步入 2.3.3 】實作資料一致性的監聽,將服務資料進行同步
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}
2.3.1 將服務添加到快取 Service.putService()
public void putService(Service service) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
synchronized (putServiceLock) {
if (!serviceMap.containsKey(service.getNamespaceId())) {
serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
}
}
}
//將 Service 保存到 serviceMap 中
serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}
2.3.2 建立心跳機制 Service.init()
public void init() {
//【斷點步入】定時檢查心跳
HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
entry.getValue().setService(this);
entry.getValue().init();
}
}
- 進入
HealthCheckReactor.scheduleCheck()方法;
public static void scheduleCheck(ClientBeatCheckTask task) {
futureMap.computeIfAbsent(task.taskKey(),
k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

- 其主要是通過創建定時任務不斷檢測當前服務下所有實體最后發送心跳包的時間,如果超時,則設定healthy為false表示服務不健康,并且發送服務變更事件;
- 心跳檢測機制可以用下圖概述:

2.3.3 實作資料一致性的監聽 DelegateConsistencyServiceImpl.listen()
@Override
public void listen(String key, RecordListener listener) throws NacosException {
// 這個鍵 key 會被兩個服務監聽
if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
//持久一致性服務
persistentConsistencyService.listen(key, listener);
//短暫一致性服務
ephemeralConsistencyService.listen(key, listener);
return;
}
mapConsistencyService(key).listen(key, listener);
}
2.4 小結
-
Nacos 客戶端通過 Open API 的形式發送服務注冊請求,服務端收到請求后,做以下三件事:
- 構建一個 Service 物件保存到 ConcurrentHashMap 集合中;
- 使用定時任務對當前服務下的所有實體建立心跳檢測機制;
- 基于資料一致性協議將服務資料進行同步;
-
至此,基于 Nacos 客戶端和服務端的服務注冊原理決議可以告一段落了,下面是一些補充內容;
3. 客戶端查詢所有服務實體
- 在進行 RPC 呼叫時,根據對服務的需求不同可以將 Nacos 客戶端分為兩種角色,分別是服務消費者和服務提供者;
- 服務消費者需要使用提供者的服務,就先要向 Nacos 服務器獲取所有已注冊的服務提供者實體,第 3 點將討論客戶端(消費者)如何獲取這些實體(提供者);
3.1 消費者客戶端向 Nacos 發出請求
-
與客戶端向 Nacos 服務器注冊服務一樣,消費者客戶端想要查詢提供者的所有實體,也要向 Nacos 服務器發送請求;
-
向 Nacos 服務器發送請求也是由兩種形式,Open API 和 SDK,其中 SDK 最終也是以 Open API 方式發送的:
- Open API:
GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider - SDK:
List<Instance> selectInstances(String serviceName, boolean healthy)throws NacosException;
- Open API:
-
我們使用 postman 發送
GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider請求,來模擬消費者客戶端獲取提供者實體;

- 這里主要多發送幾次請求,因為 Nacos 服務器可能快取了所有提供者實體資訊,可能直接從快取中拿而不是處理請求;
3.2 Nacos 服務器處理請求 InstanceController.list()
- Nacos 服務器處理獲取所有服務實體的控制器還是 nacos-naming 模塊下的 InstanceController(服務實體控制器);
- InstanceController(服務實體控制器) 提供了一個
InstanceController.list()介面處理該請求:

- 可以看到我們的斷點也被捕獲;
- 原始碼如下:
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
//決議命名空間 ID
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
//決議服務名
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
//決議獲取一堆資訊
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
//【斷點步入 3.2】獲取所有服務的所有資訊
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
healthyOnly);
}
3.2 獲取所有服務的所有資訊 InstanceController.doSrvIpxt()
InstanceController.doSrvIpxt()方法很長,筆者刪去一些非重點原始碼方便理解:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
//省略一些非核心代碼
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
//根據 namespaceld、serviceName 獲得 Service 實體
Service service = serviceManager.getService(namespaceId, serviceName);
List<Instance> srvedIPs;
//獲取指定服務下的所有實體,從Service實體中基于srvIPs得到所有服務提供者的實體資訊
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
//使用選擇器過濾一些 ip
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
//過濾不健康實體
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
//遍歷完成 JSON 字串的封裝
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
//洗掉不可用的實體
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
- 該方法的主要邏輯主要有三步:
- 首先獲取所有服務實體;
- 過濾一些服務實體;
- 遍歷完成 JSON 字串的封裝并回傳;
4. 客戶端監聽 Nacos 服務器以動態獲取服務實體
- 消費者客戶端角度出發監聽 Nacos 服務器,以動態獲知提供者的變化;
4.1 客戶端發送請求
- 客戶端服務有兩種呼叫方式:
void subscribe(String serviceName, EventListener listener) throws NacosException;public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe),其中 subscribe 置為 true;
4.2 服務動態感知的原理
- Nacos 客戶端有一個 HostReactor 類,它的功能是實作服務的動態更新;
- 客戶端發起事件訂閱后,在 HostReactor 中有一個 UpdateTask 執行緒,每 10s 發送一次 Pull 請求,獲得服務端最新的地址串列;
- 對于服務端,它和服務提供者的實體之間維持了心跳檢測,一旦服務提供者出現例外,則會發送一個 Push 訊息給 Nacos 客戶端,也就是服務消費者;
- 服務消費者收到請求之后,使用 HostReactor 中提供的 processServiceJSON 決議訊息,并更新本地服務地址串列;
- 原理可以用下圖總結:

4.3 Nacos 服務器處理請求
- 服務器與本篇第 3 點相同,其中服務器的邏輯在《3.2 Nacos 服務器處理請求》;
5. 補充內容
5.1 Dubbo 的自動裝配
- Spring Cloud Alibaba Dubbo 集成 Nacos 時,服務的注冊是依托 Dubbo 中的自動裝配機制完成的;
- spring-cloud-alibaba-dubbo 下的
META-INF/spring.factories檔案中自動裝配了一個和服務注冊相關的配置類 DubboServiceRegistrationNonWebApplicationAutoConfiguration(Dubbo 注冊自動配置類);

@Configuration
@ConditionalOnNotWebApplication
@ConditionalOnProperty(value = "https://www.cnblogs.com/dlhjw/p/spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(DubboServiceRegistrationAutoConfiguration.class)
@Aspect
public class DubboServiceRegistrationNonWebApplicationAutoConfiguration {
@Autowired
private ServiceRegistry serviceRegistry; //實作類為 NacosServiceRegistry
...
//監聽 ApplicationStartedEvent 事件,該事件在重繪背景關系之后,呼叫 application 命令前觸發;
@EventListener(ApplicationStartedEvent.class)
public void onApplicationStarted() {
setServerPort();
//最終呼叫 NacosServiceRegistry.registry 方法實作服務注冊
register();
}
private void register() {
if (registered) {
return;
}
//這里即 NacosServiceRegistry.registry()
serviceRegistry.register(registration);
registered = true;
}
...
}
-
NacosServiceRegistry.registry()的原理詳情請見本篇《1.4 注冊服務實體的邏輯 NacosServiceRegistry.register()》; -
由此可以得出結論:
- Dubbo 集成 Nacos 時,服務的注冊是依賴 Dubbo 的自動裝配機制;
- 而 Dubbo 的自動裝配機制是依靠
NacosServiceRegistry.register()實作的;
6. 原始碼結構圖總結
原始碼結構圖大致與本篇目錄一致;
6.1 客戶端視角下的服務注冊結構圖
- AbstractAutoServiceRegistration.bind():監聽服務初始化事件;
- NacosServiceRegistry.register():注冊服務實體;
- NacosNamingService.registerInstance():通過Nacos 命名服務注冊服務實體;
- BeatReactor.addBeatInfo():心跳機制;
- ScheduledThreadPoolExecutor.schedule():執行定時任務,發送心跳包 beatInfo;
- MetricsMonitor.getDom2BeatSizeMonitor():啟動心跳測量監視器;
- NamingProxy.registerService():以 Open API 方式發送注冊請求;
- NamingProxy.reqAPI():拼湊注冊服務的 API;
- NamingProxy.callServer():呼叫服務,發送請求;
- NamingProxy.reqAPI():拼湊注冊服務的 API;
- BeatReactor.addBeatInfo():心跳機制;
- NacosNamingService.registerInstance():通過Nacos 命名服務注冊服務實體;
- NacosServiceRegistry.register():注冊服務實體;
6.2 服務器視角下的服務注冊結構圖
- InstanceController.register():服務器接收請求;
- ServiceManager.registerInstance():在服務器控制臺注冊服務實體;
- ServiceManager.createEmptyService():創建空服務;
- ServiceManager.createServiceIfAbsent():如果空缺就創建服務;
- ServiceManager.putServiceAndInit():初始化服務;
- Service.putService():將服務添加到快取;
- Service.init():建立心跳機制;
- DelegateConsistencyServiceImpl.listen():實作資料一致性的監聽;
- ServiceManager.putServiceAndInit():初始化服務;
- ServiceManager.createServiceIfAbsent():如果空缺就創建服務;
- ServiceManager.addInstance():添加服務實體;
- KeyBuilder.buildInstanceListKey():創建服務是一致性鍵;
- DelegateConsistencyServiceImpl.put():將鍵和服務實體放 ConsistencyService 進行一致性維護;
- ServiceManager.createEmptyService():創建空服務;
- ServiceManager.registerInstance():在服務器控制臺注冊服務實體;
6.3 客戶端查詢所有服務實體結構圖
- InstanceController.list():服務器接收請求;
- InstanceController.doSrvIpxt():獲取所有服務的所有資訊;
- ServiceManager.getService():根據 namespaceld、serviceName 獲得 service 實體;
- Service.srvIPs():獲取指定 service 服務下的所有實體;
- JacksonUtils.createEmptyArrayNode():遍歷完成 JSON 字串的封裝;
- InstanceController.doSrvIpxt():獲取所有服務的所有資訊;
最后

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/415311.html
標籤:架構設計
