主頁 > 軟體設計 > 微服務架構 | *3.5 Nacos 服務注冊與發現的原始碼分析

微服務架構 | *3.5 Nacos 服務注冊與發現的原始碼分析

2022-01-19 17:18:35 軟體設計

目錄
  • 前言
  • 1. 客戶端注冊進 Nacos 注冊中心(客戶端視角)
    • 1.1 Spring Cloud 提供的規范標準
    • 1.2 Nacos 的自動配置類
    • 1.3 監聽服務初始化事件 AbstractAutoServiceRegistration.bind()
    • 1.4 注冊服務實體的邏輯 NacosServiceRegistry.register()
      • 1.4.1 心跳機制 BeatReactor.addBeatInfo()
      • 1.4.2 注冊服務 NamingProxy.registerService()
    • 1.5 以 Open API 方式發送注冊請求
    • 1.6 小結
  • 2. Nacos 服務器注冊服務(服務器視角)
    • 2.1 服務器接收請求 InstanceController.register()
    • 2.2 在服務器控制臺注冊服務實體 ServiceManager.registerInstance()
    • 2.3 創建空服務 ServiceManager.createEmptyService()
      • 2.3.1 將服務添加到快取 Service.putService()
      • 2.3.2 建立心跳機制 Service.init()
      • 2.3.3 實作資料一致性的監聽 DelegateConsistencyServiceImpl.listen()
    • 2.4 小結
  • 3. 客戶端查詢所有服務實體
    • 3.1 消費者客戶端向 Nacos 發出請求
    • 3.2 Nacos 服務器處理請求 InstanceController.list()
    • 3.2 獲取所有服務的所有資訊 InstanceController.doSrvIpxt()
  • 4. 客戶端監聽 Nacos 服務器以動態獲取服務實體
    • 4.1 客戶端發送請求
    • 4.2 服務動態感知的原理
    • 4.3 Nacos 服務器處理請求
  • 5. 補充內容
    • 5.1 Dubbo 的自動裝配
  • 6. 原始碼結構圖總結
    • 6.1 客戶端視角下的服務注冊結構圖
    • 6.2 服務器視角下的服務注冊結構圖
    • 6.3 客戶端查詢所有服務實體結構圖
  • 最后


前言

參考資料
《Spring Microservices in Action》
《Spring Cloud Alibaba 微服務原理與實戰》
《B站 尚硅谷 SpringCloud 框架開發教程 周陽》

為方便理解與表達,這里把 Nacos 控制臺和 Nacos 注冊中心稱為 Nacos 服務器(就是 web 界面那個),我們撰寫的業務服務稱為 Nacso 客戶端;

Nacos 客戶端將自己注冊進 Nacos 服務器,《1. 服務如何注冊進 Nacos 注冊中心》主要從 Nacos 客戶端角度解釋如何發送資訊給 Nacos 服務器;《2. Nacos 服務器注冊服務》主要從 Nacos 服務器角度解釋注冊原理;

《3. 客戶端查詢所有服務實體》將從服務消費者和提供者的角度,解釋服務消費者如何獲取提供者的所有實體,服務消費者和提供者都是 Nacos 的客戶端;

《4. 客戶端監聽 Nacos 服務器以動態獲取服務實體》從消費者客戶端角度出發監聽 Nacos 服務器,以動態獲知提供者的變化;


1. 客戶端注冊進 Nacos 注冊中心(客戶端視角)

1.1 Spring Cloud 提供的規范標準

  • 服務要注冊進 Spring Cloud 集成的 Nacos,首先要滿足 Spring Cloud 提供的規范標準;
  • spring-cloud-common 包中有一個類 org.springframework.cloud.client.serviceregistry.ServiceRegistry,它是Spring Cloud 提供的服務注冊的標準,集成到 Spring Cloud 中實作服務注冊的組件,都會實作該介面;
public interface ServiceRegistry<R extends Registration>{
    void register(R registration); 
    void deregister(R registration); 
    void close(); 
    void setStatus(R registration,String status);
    <T> T getstatus(R registration);
}

1.2 Nacos 的自動配置類

  • spring-cloud-commons 包的 META-INF/spring.factories 中包含自動裝配的配置資訊,即約定 Spring Cloud 啟動時,會將那些類自動注入到容器中:

Spring Cloud 自動裝配

  • 其中 AutoServiceRegistrationAutoConfiguration(服務注冊自動配置類) 是服務注冊相關配置類,原始碼如下:
@Configuration
@Import({AutoServiceRegistrationConfiguration.class})
@ConditionalOnProperty(
    value = https://www.cnblogs.com/dlhjw/archive/2022/01/19/{"spring.cloud.service-registry.auto-registration.enabled"},
    matchIfMissing = true
)
public class AutoServiceRegistrationAutoConfiguration {
    //自動注冊類
    @Autowired(required = false)
    private AutoServiceRegistration autoServiceRegistration;
    //自動注冊類的組態檔
    @Autowired
    private AutoServiceRegistrationProperties properties;

    public AutoServiceRegistrationAutoConfiguration() {
    }

    //初始化函式
    @PostConstruct
    protected void init() {
        if (this.autoServiceRegistration == null && this.properties.isFailFast()) {
            throw new IllegalStateException("Auto Service Registration has been requested, but there is no AutoServiceRegistration bean");
        }
    }
}
  • 其中關鍵之處在于注入了一個 AutoServiceRegistration(服務注冊器) 自動注冊介面,該介面在 Spring Cloud 中有一個抽象實作類 AbstractAutoServiceRegistration(服務注冊器抽象類)
  • 我們要用什么注冊中心,該注冊中心就要繼承 AbstractAutoServiceRegistration(服務注冊器抽象類) 抽象類;
  • 對于 Nacos 來說,使用 NacosAutoServiceRegistration(Nacos 服務注冊器) 類繼承 AbstractAutoServiceRegistration(服務注冊器抽象類) 抽象類;

NacosAutoServiceRegistration 類依賴結構圖

1.3 監聽服務初始化事件 AbstractAutoServiceRegistration.bind()

  • 在上述中需要關注 ApplicationListener(事件監聽器) 介面,它是一種事件監聽機制,介面宣告如下:
@FunctionalInterface
public interface ApplicationListener<E extends ApplicationEvent> extends EventListener {
    void onApplicationEvent(E var1);
}
  • AbstractAutoServiceRegistration(服務注冊器抽象類) 使用 AbstractAutoServiceRegistration.bind() 方法實作了該介面,用來監聽 WebServerInitializedEvent(服務初始化事件)
@Override
@SuppressWarnings("deprecation")
//【斷點步入】
public void onApplicationEvent(WebServerInitializedEvent event) {
	bind(event);
}
  • 我們給 AbstractAutoServiceRegistration.bind() 方法打上斷點,啟動服務提供者,可以發現:
    • Nacos 服務器上沒有服務注冊實體;
    • 服務初始化已經完成;

服務初始化完成

  • 【結論】說明服務注冊到 Nacos 的流程是先進行服務初始化,然后通過事件監聽機制監聽初始化事件,當初始化完成時,呼叫 AbstractAutoServiceRegistration.bind() 方法將服務注冊進 Nacos 注冊中心;

1.4 注冊服務實體的邏輯 NacosServiceRegistry.register()

這里能說明什么時候服務會將自己的資訊發給 Nacos 服務器;

  • 我們追進 AbstractAutoServiceRegistration.bind() 方法,發現在 AbstractAutoServiceRegistration(服務注冊器抽象類) 里呼叫 NacosServiceRegistry.register() 方法后, Nacos 服務器上出現服務實體;

register()方法

  • 我們追進 NacosServiceRegistry.register() 方法,方法的邏輯如下:
@Override
public void register(Registration registration) {
    //判斷是否有服務 ID
	if (StringUtils.isEmpty(registration.getServiceId())) {
		log.warn("No service to register for nacos client...");
		return;
	}
	String serviceId = registration.getServiceId();  //服務 ID(service-provider)
	Instance instance = getNacosInstanceFromRegistration(registration);  //服務實體(里面有 ip、port 等資訊)
	try {
	    //【斷點步入】注冊的方法
		namingService.registerInstance(serviceId, instance);
		log.info("nacos registry, {} {}:{} register finished", serviceId,
				instance.getIp(), instance.getPort());
	}
	catch (Exception e) {
		log.error("nacos registry, {} register failed...{},", serviceId,
				registration.toString(), e);
	}
}

服務 ID 與服務實體資訊

  • 我們點進去 namingService.registerInstance() 方法(實作邏輯在 NacosNamingService.registerInstance())得到注冊的具體邏輯,如下:
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    if (instance.isEphemeral()) {
        //用心跳 BeatInfo 封裝服務實體資訊
        BeatInfo beatInfo = new BeatInfo();
        beatInfo.setServiceName(NamingUtils.getGroupedName(serviceName, groupName));
        beatInfo.setIp(instance.getIp());
        beatInfo.setPort(instance.getPort());
        beatInfo.setCluster(instance.getClusterName());
        beatInfo.setWeight(instance.getWeight());
        beatInfo.setMetadata(instance.getMetadata());
        beatInfo.setScheduled(false);
        long instanceInterval = instance.getInstanceHeartBeatInterval();
        beatInfo.setPeriod(instanceInterval == 0L ? DEFAULT_HEART_BEAT_INTERVAL : instanceInterval);
        //【斷點步入 1.4.1】將 beatInfo 心跳資訊放進 beatReactor 心跳發送器(發送心跳后,Nacos 服務器仍然沒有服務實體)
        this.beatReactor.addBeatInfo(NamingUtils.getGroupedName(serviceName, groupName), beatInfo);
    }
    //【斷點步入 1.4.2】使用 namingProxy 命名代理方式將服務實體資訊以 POST 請求方式發送服務實體
    this.serverProxy.registerService(NamingUtils.getGroupedName(serviceName, groupName), groupName, instance);
}
  • 即注冊服務實體的邏輯分兩步:

    • 先通過 beatReactor.addBeatInfo 創建心跳資訊實作健康檢測;
    • 再以 POST 請求方式發送服務實體資訊;

1.4.1 心跳機制 BeatReactor.addBeatInfo()

  • 我們進入 BeatReactor.addBeatInfo() 方法一探心跳機制,原始碼如下:
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    LogUtils.NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    this.dom2Beat.put(this.buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()), beatInfo);
    //【核心】定時向服務端發送一個心跳包 beatInfo
    this.executorService.schedule(new BeatReactor.BeatTask(beatInfo), 0L, TimeUnit.MILLISECONDS);
    //【核心】
    MetricsMonitor.getDom2BeatSizeMonitor().set((double)this.dom2Beat.size());
}
  • BeatReactor.addBeatInfo() 方法主要做了兩件事:

    • 客戶端呼叫 ScheduledExecutorService.schedule() 介面方法(靠 ScheduledThreadPoolExecutor(計劃執行緒執行器) 實作)執行定時任務,在每個任務周期內定時向服務端發送一個心跳包 beatInfo;
    • 然后通過 MetricsMonitor.getDom2BeatSizeMonitor() 方法獲取一個 心跳測量監視器(實際為 Gauge),不斷檢測服務端的回應,如果在設定時間內沒有收到服務端的回應,則認為服務器出現了故障;
  • Nacos 服務端會根據客戶端的心跳包不斷更新服務的狀態;

1.4.2 注冊服務 NamingProxy.registerService()

  • 接著進入 NamingProxy.registerService() 方法,原始碼如下:
public void registerService(String serviceName, String groupName, Instance instance) throws NacosException {
    LogUtils.NAMING_LOGGER.info("[REGISTER-SERVICE] {} registering service {} with instance: {}", new Object[]{this.namespaceId, serviceName, instance});
    Map<String, String> params = new HashMap(9);
    params.put("namespaceId", this.namespaceId);
    params.put("serviceName", serviceName);
    params.put("groupName", groupName);
    params.put("clusterName", instance.getClusterName());
    params.put("ip", instance.getIp());
    params.put("port", String.valueOf(instance.getPort()));
    params.put("weight", String.valueOf(instance.getWeight()));
    params.put("enable", String.valueOf(instance.isEnabled()));
    params.put("healthy", String.valueOf(instance.isHealthy()));
    params.put("ephemeral", String.valueOf(instance.isEphemeral()));
    params.put("metadata", JSON.toJSONString(instance.getMetadata()));
    //【斷點步入】這步執行完后,Nacos 服務器才出現服務實體資訊
    this.reqAPI(UtilAndComs.NACOS_URL_INSTANCE, params, (String)"POST");
}
  • 該方法先對服務實體做了封裝,然后通過 NamingProxy.reqAPI() 方法拼湊注冊服務的 API, NamingProxy.reqAPI() 方法原始碼如下:
    public String reqAPI(String api, Map<String, String> params, List<String> servers, String method) {
        params.put("namespaceId", this.getNamespaceId());
        if (CollectionUtils.isEmpty(servers) && StringUtils.isEmpty(this.nacosDomain)) {
            throw new IllegalArgumentException("no server available");
        } else {
            Exception exception = new Exception();
            if (servers != null && !servers.isEmpty()) {
                Random random = new Random(System.currentTimeMillis());
                int index = random.nextInt(servers.size());

                for(int i = 0; i < servers.size(); ++i) {
                    String server = (String)servers.get(index);

                    try {
                        return this.callServer(api, params, server, method);
                    } catch (NacosException var11) {
                        exception = var11;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var11);
                    } catch (Exception var12) {
                        exception = var12;
                        LogUtils.NAMING_LOGGER.error("request {} failed.", server, var12);
                    }
                    index = (index + 1) % servers.size();
                }

                throw new IllegalStateException("failed to req API:" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            } else {
                int i = 0;
                while(i < 3) {
                    try {
                        return this.callServer(api, params, this.nacosDomain);
                    } catch (Exception var13) {
                        exception = var13;
                        LogUtils.NAMING_LOGGER.error("[NA] req api:" + api + " failed, server(" + this.nacosDomain, var13);
                        ++i;
                    }
                }
                throw new IllegalStateException("failed to req API:/api/" + api + " after all servers(" + servers + ") tried: " + ((Exception)exception).getMessage());
            }
        }
    }

1.5 以 Open API 方式發送注冊請求

  • 最終將以 Open API 方式發送如下請求給 Nacos 服務器:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'

reqAPI() debug

1.6 小結

  • Nacos 在先加載組態檔初始化服務,使用 ApplicationListener(事件監聽器) 監聽機制監聽該服務初始化事件,當服務初始化完成后,進入 NacosServiceRegistry.register() 注冊邏輯;
  • 注冊原理分兩步:
    • 先使用 NacosNamingService(Nacos 命名服務) 發送心跳;
    • 再使用 NamingProxy(命名代理),以 POST 請求方式發送服務實體給 Nacos 服務器;
  • 即 Nacos Service 必須要確保注冊的服務實體是健康的(心跳檢測),才能進行服務注冊;

2. Nacos 服務器注冊服務(服務器視角)

  • Nacos 服務器的原始碼下載、啟動詳情請見《微服務架構 | 3.2 Alibaba Nacos 注冊中心》;
  • 上述提到客戶端注冊服務器的最后一步是向服務器發送如下 Open API 請求:
  • POST 'http://127.0.0.1:8848/nacos/v1/ns/instance?servicelame=nacos.naming.serviceName&ip=10.200.9.143&port=18082'
  • 那么服務器的原始碼分析將從這個請求開始;

2.1 服務器接收請求 InstanceController.register()

  • 服務器在 nacos-naming 模塊下 InstanceController(服務實體控制器) 里定義了一個 register 方法用來處理客戶端的注冊,原始碼如下:
@RestController
@RequestMapping(UtilsAndCommons.NACOS_NAMING_CONTEXT + "/instance")
public class InstanceController {

    //省略其他代碼

    @CanDistro
    @PostMapping
    @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
    public String register(HttpServletRequest request) throws Exception {
        //從請求體里決議出 namespaceId 命名空間(本例中是 public)
        final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
        //決議 serviceName 服務名(本例中是 DEFAULT_GROUP@@service-provider,實際就是service-provider)
        final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
        NamingUtils.checkServiceNameFormat(serviceName);
        
        final Instance instance = parseInstance(request);
        //【斷點步入】在服務器控制臺注冊服務實體的方法
        serviceManager.registerInstance(namespaceId, serviceName, instance);
        return "ok";
    }

        //省略其他代碼
}

決議命名空間和服務名

2.2 在服務器控制臺注冊服務實體 ServiceManager.registerInstance()

  • ServiceManager.registerInstance() 方法的原始碼如下:
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //【斷點步入 2.3】創建空服務
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(NacosException.INVALID_PARAM,
                "service not found, namespace: " + namespaceId + ", service: " + serviceName);
    }
    //添加服務實體
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}
  • 這段代碼主要做了三個邏輯的內容:
    • createEmptyService():創建空服務(用于在 Nacos 服務器控制臺的“服務串列”中展示服務資訊),實際上是初始化一個 serviceMap,它是一個 ConcurrentHashMap 集合;
    • getService():從 serviceMap 中根據 namespaceld 和 serviceName 得到一個服務物件;
    • addInstance():把當前注冊的服務實體保存到 Service 中;

2.3 創建空服務 ServiceManager.createEmptyService()

  • 我們一直追進去,發現最后呼叫的是 ServiceManager.createServiceIfAbsent() 方法,原始碼如下:
public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
    //通過命名空間 ID 和服務名從快取中獲取 service 服務,第一次是沒有的,進入 if 陳述句創建 service
    Service service = getService(namespaceId, serviceName);
    if (service == null) {
        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        //判斷服務是否有效,不生效將拋出例外
        service.validate();
        //【斷點步入】添加與初始化服務
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}
  • 第一次注冊服務時快取中肯定是沒有的,我們進入到 if 陳述句里的 ServiceManager.putServiceAndInit() 方法;
private void putServiceAndInit(Service service) throws NacosException {
    //【斷點步入 2.3.1 】將服務添加到快取
    putService(service);
    service = getService(service.getNamespaceId(), service.getName());
    //【斷點步入 2.3.2 】建立心跳機制
    service.init();
    //【斷點步入 2.3.3 】實作資料一致性的監聽,將服務資料進行同步
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

2.3.1 將服務添加到快取 Service.putService()

public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    //將 Service 保存到 serviceMap 中
    serviceMap.get(service.getNamespaceId()).putIfAbsent(service.getName(), service);
}

2.3.2 建立心跳機制 Service.init()

public void init() {
    //【斷點步入】定時檢查心跳
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}
  • 進入 HealthCheckReactor.scheduleCheck() 方法;
public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.computeIfAbsent(task.taskKey(),
            k -> GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

創建定時任務檢查心跳

  • 其主要是通過創建定時任務不斷檢測當前服務下所有實體最后發送心跳包的時間,如果超時,則設定healthy為false表示服務不健康,并且發送服務變更事件;
  • 心跳檢測機制可以用下圖概述:

心跳檢測機制

2.3.3 實作資料一致性的監聽 DelegateConsistencyServiceImpl.listen()

@Override
public void listen(String key, RecordListener listener) throws NacosException {
    
    // 這個鍵 key 會被兩個服務監聽
    if (KeyBuilder.SERVICE_META_KEY_PREFIX.equals(key)) {
        //持久一致性服務
        persistentConsistencyService.listen(key, listener);
        //短暫一致性服務
        ephemeralConsistencyService.listen(key, listener);
        return;
    }
    
    mapConsistencyService(key).listen(key, listener);
}

2.4 小結

  • Nacos 客戶端通過 Open API 的形式發送服務注冊請求,服務端收到請求后,做以下三件事:

    • 構建一個 Service 物件保存到 ConcurrentHashMap 集合中;
    • 使用定時任務對當前服務下的所有實體建立心跳檢測機制;
    • 基于資料一致性協議將服務資料進行同步;
  • 至此,基于 Nacos 客戶端和服務端的服務注冊原理決議可以告一段落了,下面是一些補充內容;


3. 客戶端查詢所有服務實體

  • 在進行 RPC 呼叫時,根據對服務的需求不同可以將 Nacos 客戶端分為兩種角色,分別是服務消費者和服務提供者;
  • 服務消費者需要使用提供者的服務,就先要向 Nacos 服務器獲取所有已注冊的服務提供者實體,第 3 點將討論客戶端(消費者)如何獲取這些實體(提供者);

3.1 消費者客戶端向 Nacos 發出請求

  • 與客戶端向 Nacos 服務器注冊服務一樣,消費者客戶端想要查詢提供者的所有實體,也要向 Nacos 服務器發送請求;

  • 向 Nacos 服務器發送請求也是由兩種形式,Open API 和 SDK,其中 SDK 最終也是以 Open API 方式發送的:

    • Open APIGET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider
    • SDKList<Instance> selectInstances(String serviceName, boolean healthy)throws NacosException;
  • 我們使用 postman 發送 GET 127.0.0.1:8848/nacos/v1/ns/instance/list?serviceName=service-provider 請求,來模擬消費者客戶端獲取提供者實體;

發送 postman 請求所有提供者服務實體

  • 這里主要多發送幾次請求,因為 Nacos 服務器可能快取了所有提供者實體資訊,可能直接從快取中拿而不是處理請求;

3.2 Nacos 服務器處理請求 InstanceController.list()

  • Nacos 服務器處理獲取所有服務實體的控制器還是 nacos-naming 模塊下的 InstanceController(服務實體控制器)
  • InstanceController(服務實體控制器) 提供了一個 InstanceController.list() 介面處理該請求:

Nacos 服務器處理獲取服務實體的請求

  • 可以看到我們的斷點也被捕獲;
  • 原始碼如下:
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    
    //決議命名空間 ID
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    //決議服務名
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    //決議獲取一堆資訊
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    //【斷點步入 3.2】獲取所有服務的所有資訊
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,
            healthyOnly);
}

3.2 獲取所有服務的所有資訊 InstanceController.doSrvIpxt()

  • InstanceController.doSrvIpxt() 方法很長,筆者刪去一些非重點原始碼方便理解:
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP,int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {

    //省略一些非核心代碼
    
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    //根據 namespaceld、serviceName 獲得 Service 實體
    Service service = serviceManager.getService(namespaceId, serviceName);
    List<Instance> srvedIPs;
    
    //獲取指定服務下的所有實體,從Service實體中基于srvIPs得到所有服務提供者的實體資訊
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    
    //使用選擇器過濾一些 ip
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    
    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    
    //過濾不健康實體
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
    
    //遍歷完成 JSON 字串的封裝
    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();
        if (healthyOnly && !entry.getKey()) {
            continue;
        }
        for (Instance instance : ips) {
            //洗掉不可用的實體
            if (!instance.isEnabled()) {
                continue;
            }
            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }
            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);
        }
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}
  • 該方法的主要邏輯主要有三步:
    • 首先獲取所有服務實體;
    • 過濾一些服務實體;
    • 遍歷完成 JSON 字串的封裝并回傳;

4. 客戶端監聽 Nacos 服務器以動態獲取服務實體

  • 消費者客戶端角度出發監聽 Nacos 服務器,以動態獲知提供者的變化;

4.1 客戶端發送請求

  • 客戶端服務有兩種呼叫方式:
    • void subscribe(String serviceName, EventListener listener) throws NacosException
    • public List<Instance> selectInstances(String serviceName, List<String> clusters, boolean healthy, boolean subscribe),其中 subscribe 置為 true;

4.2 服務動態感知的原理

  • Nacos 客戶端有一個 HostReactor 類,它的功能是實作服務的動態更新;
  • 客戶端發起事件訂閱后,在 HostReactor 中有一個 UpdateTask 執行緒,每 10s 發送一次 Pull 請求,獲得服務端最新的地址串列;
  • 對于服務端,它和服務提供者的實體之間維持了心跳檢測,一旦服務提供者出現例外,則會發送一個 Push 訊息給 Nacos 客戶端,也就是服務消費者;
  • 服務消費者收到請求之后,使用 HostReactor 中提供的 processServiceJSON 決議訊息,并更新本地服務地址串列;
  • 原理可以用下圖總結:

服務動態感知的原理

4.3 Nacos 服務器處理請求

  • 服務器與本篇第 3 點相同,其中服務器的邏輯在《3.2 Nacos 服務器處理請求》;

5. 補充內容

5.1 Dubbo 的自動裝配

  • Spring Cloud Alibaba Dubbo 集成 Nacos 時,服務的注冊是依托 Dubbo 中的自動裝配機制完成的;
  • spring-cloud-alibaba-dubbo 下的 META-INF/spring.factories 檔案中自動裝配了一個和服務注冊相關的配置類 DubboServiceRegistrationNonWebApplicationAutoConfiguration(Dubbo 注冊自動配置類)

Dubbo 的自動組態檔

@Configuration
@ConditionalOnNotWebApplication
@ConditionalOnProperty(value = "https://www.cnblogs.com/dlhjw/archive/2022/01/19/spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter(DubboServiceRegistrationAutoConfiguration.class)
@Aspect
public class DubboServiceRegistrationNonWebApplicationAutoConfiguration {

	@Autowired
	private ServiceRegistry serviceRegistry; //實作類為 NacosServiceRegistry
    ...

	//監聽 ApplicationStartedEvent 事件,該事件在重繪背景關系之后,呼叫 application 命令前觸發;
	@EventListener(ApplicationStartedEvent.class)
	public void onApplicationStarted() {
		setServerPort();
		//最終呼叫 NacosServiceRegistry.registry 方法實作服務注冊
		register();
	}
	private void register() {
		if (registered) {
			return;
		}
		//這里即 NacosServiceRegistry.registry()
		serviceRegistry.register(registration);
		registered = true;
	}
	...
}
  • NacosServiceRegistry.registry() 的原理詳情請見本篇《1.4 注冊服務實體的邏輯 NacosServiceRegistry.register()》;

  • 由此可以得出結論

    • Dubbo 集成 Nacos 時,服務的注冊是依賴 Dubbo 的自動裝配機制;
    • 而 Dubbo 的自動裝配機制是依靠 NacosServiceRegistry.register() 實作的;

6. 原始碼結構圖總結

原始碼結構圖大致與本篇目錄一致;

6.1 客戶端視角下的服務注冊結構圖

  • AbstractAutoServiceRegistration.bind():監聽服務初始化事件;
    • NacosServiceRegistry.register():注冊服務實體;
      • NacosNamingService.registerInstance():通過Nacos 命名服務注冊服務實體;
        • BeatReactor.addBeatInfo():心跳機制;
          • ScheduledThreadPoolExecutor.schedule():執行定時任務,發送心跳包 beatInfo;
          • MetricsMonitor.getDom2BeatSizeMonitor():啟動心跳測量監視器;
        • NamingProxy.registerService():以 Open API 方式發送注冊請求;
          • NamingProxy.reqAPI():拼湊注冊服務的 API;
            • NamingProxy.callServer():呼叫服務,發送請求;

6.2 服務器視角下的服務注冊結構圖

  • InstanceController.register():服務器接收請求;
    • ServiceManager.registerInstance():在服務器控制臺注冊服務實體;
      • ServiceManager.createEmptyService():創建空服務;
        • ServiceManager.createServiceIfAbsent():如果空缺就創建服務;
          • ServiceManager.putServiceAndInit():初始化服務;
            • Service.putService():將服務添加到快取;
            • Service.init():建立心跳機制;
            • DelegateConsistencyServiceImpl.listen():實作資料一致性的監聽;
      • ServiceManager.addInstance():添加服務實體;
        • KeyBuilder.buildInstanceListKey():創建服務是一致性鍵;
        • DelegateConsistencyServiceImpl.put():將鍵和服務實體放 ConsistencyService 進行一致性維護;

6.3 客戶端查詢所有服務實體結構圖

  • InstanceController.list():服務器接收請求;
    • InstanceController.doSrvIpxt():獲取所有服務的所有資訊;
      • ServiceManager.getService():根據 namespaceld、serviceName 獲得 service 實體;
      • Service.srvIPs():獲取指定 service 服務下的所有實體;
      • JacksonUtils.createEmptyArrayNode():遍歷完成 JSON 字串的封裝;


最后

新人制作,如有錯誤,歡迎指出,感激不盡!
歡迎關注公眾號,會分享一些更日常的東西!
如需轉載,請標注出處!

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/415317.html

標籤:其他

上一篇:淺談23種設計模式之策略設計模式

下一篇:何時使用隱式引數

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more