核心功能點
【1】服務注冊:Nacos Client會通過發送REST請求的方式向Nacos Server注冊自己的服務,提供自身的元資料,比如ip地址、埠等資訊,Nacos Server接收到注冊請求后,就會把這些元資料資訊存盤在一個雙層的記憶體Map中,
【2】服務心跳:在服務注冊后,Nacos Client會維護一個定時心跳來持續通知Nacos Server,說明服務一直處于可用狀態,防止被剔除,默認5s發送一次心跳,
【3】服務同步:Nacos Server集群之間會互相同步服務實體,用來保證服務資訊的一致性,
【4】服務發現:服務消費者(Nacos Client)在呼叫服務提供者的服務時,會發送一個REST請求給Nacos Server,獲取上面注冊的服務清單,并且快取在Nacos Client本地,同時會在Nacos Client本地開啟一個定時任務定時拉取服務端最新的注冊表資訊更新到本地快取
【5】服務健康檢查:Nacos Server會開啟一個定時任務用來檢查注冊服務實體的健康情況,對于超過15s沒有收到客戶端心跳的實體會將它的healthy屬性置為false(客戶端服務發現時不會發現),如果某個實體超過30秒沒有收到心跳,直接剔除該實體(被剔除的實體如果恢復發送心跳則會重新注冊)
原始碼精髓總結
【1】注冊表的結構說明(這個僅是記錄):
//Map<namespaceId, Map<service_name, Service>【ConcurrentSkipListMap】> private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); //再分析里面的Service,Map<clusterName, Cluster> private Map<String, Cluster> clusterMap = new HashMap<>(); //再分析Cluster private Set<Instance> persistentInstances = new HashSet<>(); private Set<Instance> ephemeralInstances = new HashSet<>();
【2】分析注冊表為何要這么設計
1.注冊表是基于第一層ConcurrentHashMap,第二層ConcurrentSkipListMap,第三層HashMap,然后定位到對應的Cluster, 2.至于為什么要這樣設計,一方面是將粒度劃分的更細,通過原始碼分析可知,nacos更新注冊表是進行小范圍的更新,如定位到Cluster的臨時串列ephemeralInstances或者持久串列persistentInstances【這兩個都是set集合,所以排除了會有重復的資料】,因為粒度小所以更新速度會更快, 3.其次采用的是 寫時復制思想,也就是說,不會影響讀取的效率,因為是新開一個副本,將新舊的資料合并到一個新資料里面,然后將參考指向新資料, 4.其次是為了高擴展,對namespace進行劃分【對開發環境隔離】,對service進行劃分【對服務進行隔離】,對Cluster進行劃分【多機房部署,加快訪問速度】 5.為了解決并發讀寫問題,采用的是ConcurrentHashMap與ConcurrentSkipListMap的分段鎖,加上Cluster里面的寫時復制,其次Cluster里面是不加鎖的,因為是單執行緒進行修改,不存在沖突, 6.雖說犧牲了,一定的實時性,但是大大提高了并發的性能,
【3】分析AP架構下為什么高性能的原因
1.因為采用的是異步任務加佇列的形式來實作注冊的,所以回應很快,然后任務是慢慢做的, 2.Notifier 是在DistroConsistencyServiceImpl類中初始化,默認單執行緒,而且佇列為ArrayBlockingQueue<>(1024 * 1024), 3.縮小了變更資料的粒度,單執行緒避免了執行緒安全問題【不用加鎖】, 4.這種方式毫無疑問是會存在問題的,就是回應了但是沒有注冊上,但是對于這個問題,在客戶端里面做了心跳機制,如果檢測不到會重新注冊,
【4】分析Nacos為什么感知快的原因
采用的是客戶端定時進行一次拉取,兼服務端采用異步的形式使用UDP發送更新的資料到客戶端;
雖然UDP存在通知丟失的情況,但是每隔1s的拉取依舊能很好的保持資料的最終一致性,
原始碼分析
驗證服務端
【1】在啟動的時候我們一般是呼叫shell腳本啟動,查看startup.sh腳本
從以下看實際上是呼叫了java命令啟動了個java的專案(-jar ${BASE_DIR}/target/${SERVER}.jar 將引數對應替換后 -jar ${BASE_DIR}/target/nacos-server.jar)
去尋找啟動入口的時候會發現,它其實是SpringBoot搭建的一個WEB服務,
cygwin=false darwin=false os400=false case "`uname`" in CYGWIN*) cygwin=true;; Darwin*) darwin=true;; OS400*) os400=true;; esac error_exit () { echo "ERROR: $1 !!" exit 1 } [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java [ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java [ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME if [ -z "$JAVA_HOME" ]; then if $darwin; then if [ -x '/usr/libexec/java_home' ] ; then export JAVA_HOME=`/usr/libexec/java_home` elif [ -d "/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" ]; then export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" fi else JAVA_PATH=`dirname $(readlink -f $(which javac))` if [ "x$JAVA_PATH" != "x" ]; then export JAVA_HOME=`dirname $JAVA_PATH 2>/dev/null` fi fi if [ -z "$JAVA_HOME" ]; then error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!" fi fi export SERVER="nacos-server" export MODE="cluster" export FUNCTION_MODE="all" export MEMBER_LIST="" export EMBEDDED_STORAGE="" while getopts ":m:f:s:c:p:" opt do case $opt in m) MODE=$OPTARG;; f) FUNCTION_MODE=$OPTARG;; s) SERVER=$OPTARG;; c) MEMBER_LIST=$OPTARG;; p) EMBEDDED_STORAGE=$OPTARG;; ?) echo "Unknown parameter" exit 1;; esac done export JAVA_HOME export JAVA="$JAVA_HOME/bin/java" export BASE_DIR=`cd $(dirname $0)/..; pwd` export CUSTOM_SEARCH_LOCATIONS=file:${BASE_DIR}/conf/ #=========================================================================================== # JVM Configuration #=========================================================================================== if [[ "${MODE}" == "standalone" ]]; then JAVA_OPT="${JAVA_OPT} -Xms512m -Xmx512m -Xmn256m" JAVA_OPT="${JAVA_OPT} -Dnacos.standalone=true" else if [[ "${EMBEDDED_STORAGE}" == "embedded" ]]; then JAVA_OPT="${JAVA_OPT} -DembeddedStorage=true" fi JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m" JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof" JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages" fi if [[ "${FUNCTION_MODE}" == "config" ]]; then JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=config" elif [[ "${FUNCTION_MODE}" == "naming" ]]; then JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=naming" fi JAVA_OPT="${JAVA_OPT} -Dnacos.member.list=${MEMBER_LIST}" JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p') if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${BASE_DIR}/logs/nacos_gc.log:time,tags:filecount=10,filesize=102400" else JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext" JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M" fi JAVA_OPT="${JAVA_OPT} -Dloader.path=${BASE_DIR}/plugins/health,${BASE_DIR}/plugins/cmdb" JAVA_OPT="${JAVA_OPT} -Dnacos.home=${BASE_DIR}" JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/target/${SERVER}.jar" JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}" JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=${CUSTOM_SEARCH_LOCATIONS}" JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/nacos-logback.xml" JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288" if [ ! -d "${BASE_DIR}/logs" ]; then mkdir ${BASE_DIR}/logs fi echo "$JAVA ${JAVA_OPT}" if [[ "${MODE}" == "standalone" ]]; then echo "nacos is starting with standalone" else echo "nacos is starting with cluster" fi # check the start.out log output file if [ ! -f "${BASE_DIR}/logs/start.out" ]; then touch "${BASE_DIR}/logs/start.out" fi # start echo "$JAVA ${JAVA_OPT}" > ${BASE_DIR}/logs/start.out 2>&1 & nohup $JAVA ${JAVA_OPT} nacos.nacos >> ${BASE_DIR}/logs/start.out 2>&1 & echo "nacos is starting,you can check the ${BASE_DIR}/logs/start.out"
從客戶端開始分析
【1】根據自動裝配原理(尋找spring.factories檔案配置)
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\ com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\ com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\ com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration
【2】分析NacosDiscoveryAutoConfiguration類自動裝配了什么
@Configuration @EnableConfigurationProperties @ConditionalOnNacosDiscoveryEnabled @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) @AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class }) public class NacosDiscoveryAutoConfiguration { @Bean public NacosServiceRegistry nacosServiceRegistry( NacosDiscoveryProperties nacosDiscoveryProperties) { return new NacosServiceRegistry(nacosDiscoveryProperties); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosRegistration nacosRegistration( NacosDiscoveryProperties nacosDiscoveryProperties, ApplicationContext context) { return new NacosRegistration(nacosDiscoveryProperties, context); } //可以看出是將上面兩個Bean當做引數傳入了這個Bean @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) public NacosAutoServiceRegistration nacosAutoServiceRegistration( NacosServiceRegistry registry, AutoServiceRegistrationProperties autoServiceRegistrationProperties, NacosRegistration registration) { return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration); } }
【3】分析NacosAutoServiceRegistration類有什么重要性
利用監聽機制,達到注冊服務的目的,監聽WebServer初始化事件
//class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration> //abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent> //因為繼承了ApplicationListener,必然會有監聽方法 public void onApplicationEvent(WebServerInitializedEvent event) { bind(event); } @Deprecated public void bind(WebServerInitializedEvent event) { ApplicationContext context = event.getApplicationContext(); if (context instanceof ConfigurableWebServerApplicationContext) { if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) { return; } } this.port.compareAndSet(0, event.getWebServer().getPort()); this.start(); } public void start() { if (!isEnabled()) {return; } // only initialize if nonSecurePort is greater than 0 and it isn't already running // because of containerPortInitializer below if (!this.running.get()) { this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration())); register(); if (shouldRegisterManagement()) { registerManagement(); } this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration())); this.running.compareAndSet(false, true); } } protected void register() { this.serviceRegistry.register(getRegistration()); } @Override public void register(Registration registration) { if (StringUtils.isEmpty(registration.getServiceId())) { return; } NamingService namingService = namingService(); String serviceId = registration.getServiceId(); String group = nacosDiscoveryProperties.getGroup(); Instance instance = getNacosInstanceFromRegistration(registration); try { namingService.registerInstance(serviceId, group, instance); } catch (Exception e) { // rethrow a RuntimeException if the registration is failed. // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132 rethrowRuntimeException(e); } }
【4】分析如何注冊的【服務注冊】
//NacosNamingService類的registerInstance方法 @Override public void registerInstance(String serviceName, Instance instance) throws NacosException { registerInstance(serviceName, Constants.DEFAULT_GROUP, instance); } //NacosNamingService類#registerInstance方法 @Override public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException { NamingUtils.checkInstanceIsLegal(instance); String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName); if (instance.isEphemeral()) { BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
//添加一個延時執行的心跳任務 beatReactor.addBeatInfo(groupedServiceName, beatInfo); }
//進行服務注冊 serverProxy.registerService(groupedServiceName, groupName, instance); } //NamingProxy類#registerService方法 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { //構建注冊引數 final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //向服務端發送請求 //UtilAndComs.nacosUrlInstance=/nacos/v1/ns/instance 也就是官網所示的注冊介面地址 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); } public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return reqApi(api, params, body, getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) { throw new NacosException(...); } NacosException exception = new NacosException(); if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; } index = (index + 1) % servers.size(); } } throw new NacosException(...); } public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0; injectSecurityInfo(params); Header header = builderHeader(); String url; if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!IPUtil.containsPort(curServer)) { curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } try { //真正遠程呼叫 HttpRestResult<String> restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start); if (restResult.ok()) { return restResult.getData(); } if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) { return StringUtils.EMPTY; } throw new NacosException(restResult.getCode(), restResult.getMessage()); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, e); } } public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception { RequestHttpEntity requestHttpEntity = new RequestHttpEntity( header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues); return execute(url, httpMethod, requestHttpEntity, responseType); } private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception { URI uri = HttpUtils.buildUri(url, requestEntity.getQuery()); ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType); HttpClientResponse response = null; try { //使用JdkHttpClientRequest去發起請求 response = this.requestClient().execute(uri, httpMethod, requestEntity); return responseHandler.handle(response); } finally { if (response != null) { response.close(); } } } //JdkHttpClientRequest類#execute方法 @Override public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception { final Object body = requestHttpEntity.getBody(); final Header headers = requestHttpEntity.getHeaders(); replaceDefaultConfig(requestHttpEntity.getHttpClientConfig()); HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection(); Map<String, String> headerMap = headers.getHeader(); if (headerMap != null && headerMap.size() > 0) { for (Map.Entry<String, String> entry : headerMap.entrySet()) { conn.setRequestProperty(entry.getKey(), entry.getValue()); } } conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis()); conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis()); conn.setRequestMethod(httpMethod); if (body != null && !"".equals(body)) { String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE); String bodyStr = JacksonUtils.toJson(body); if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) { Map<String, String> map = JacksonUtils.toObj(bodyStr, HashMap.class); bodyStr = HttpUtils.encodingParams(map, headers.getCharset()); } if (bodyStr != null) { conn.setDoOutput(true); byte[] b = bodyStr.getBytes(); conn.setRequestProperty("Content-Length", String.valueOf(b.length)); OutputStream outputStream = conn.getOutputStream(); outputStream.write(b, 0, b.length); outputStream.flush(); IoUtils.closeQuietly(outputStream); } } conn.connect(); return new JdkHttpClientResponse(conn); }
【5】beatReactor.addBeatInfo 心跳任務的流程【服務心跳】
//BeatReactor類#構造方法 public BeatReactor(NamingProxy serverProxy, int threadCount) { this.serverProxy = serverProxy; //定義延遲的執行緒池 this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.beat.sender"); return thread; } }); } //添加任務方法 public void addBeatInfo(String serviceName, BeatInfo beatInfo) { NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo); String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort()); BeatInfo existBeat = null; //fix #1733 if ((existBeat = dom2Beat.remove(key)) != null) { existBeat.setStopped(true); } dom2Beat.put(key, beatInfo); //實際上就是往延遲的執行緒池添加任務 executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS); MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size()); } //分析心跳任務類,主要都是run方法 //這種呼叫方式eureka中也是 class BeatTask implements Runnable { BeatInfo beatInfo; public BeatTask(BeatInfo beatInfo) { this.beatInfo = beatInfo; } @Override public void run() { if (beatInfo.isStopped()) { return; } long nextTime = beatInfo.getPeriod(); try { //呼叫server代理實體發送心跳介面 JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled); long interval = result.get("clientBeatInterval").asLong(); boolean lightBeatEnabled = false; if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) { lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean(); } BeatReactor.this.lightBeatEnabled = lightBeatEnabled; if (interval > 0) { nextTime = interval; } int code = NamingResponseCode.OK; if (result.has(CommonParams.CODE)) { code = result.get(CommonParams.CODE).asInt(); } //服務回傳沒有,則再次注冊 if (code == NamingResponseCode.RESOURCE_NOT_FOUND) { Instance instance = new Instance(); instance.setPort(beatInfo.getPort()); instance.setIp(beatInfo.getIp()); instance.setWeight(beatInfo.getWeight()); instance.setMetadata(beatInfo.getMetadata()); instance.setClusterName(beatInfo.getCluster()); instance.setServiceName(beatInfo.getServiceName()); instance.setInstanceId(instance.getInstanceId()); instance.setEphemeral(true); try { //又是一個注冊方法的呼叫 serverProxy.registerService(beatInfo.getServiceName(), NamingUtils.getGroupName(beatInfo.getServiceName()), instance); } catch (Exception ignore) { } } } catch (NacosException ex) {...} //方法內再次將任務塞入,形成回圈呼叫 executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS); } } public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException { Map<String, String> params = new HashMap<String, String>(8); Map<String, String> bodyMap = new HashMap<String, String>(2); if (!lightBeatEnabled) { bodyMap.put("beat", JacksonUtils.toJson(beatInfo)); } params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName()); params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster()); params.put("ip", beatInfo.getIp()); params.put("port", String.valueOf(beatInfo.getPort())); //地址為/nacos/v1/ns/instance/beat String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT); return JacksonUtils.toObj(result); }
【6】分析如何引入服務的【服務發現】
//NacosNamingService類#getAllInstances方法 @Override public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException { ServiceInfo serviceInfo; // 是否是訂閱模式,默認是true if (subscribe) { // 先從客戶端快取獲取服務資訊 serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } else { // 如果本地快取不存在服務資訊,則進行訂閱 serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ",")); } List<Instance> list; // 從服務資訊中獲取實體串列 if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) { return new ArrayList<Instance>(); } return list; }
【6.1】分析先從快取中拿的hostReactor.getServiceInfo方法
//獲取服務資訊 public ServiceInfo getServiceInfo(final String serviceName, final String clusters) { NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch()); String key = ServiceInfo.getKey(serviceName, clusters); if (failoverReactor.isFailoverSwitch()) { return failoverReactor.getService(key); } //獲取服務的資訊 ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters); //客戶端第一次獲取這個注冊表資訊為空 if (null == serviceObj) { serviceObj = new ServiceInfo(serviceName, clusters); serviceInfoMap.put(serviceObj.getKey(), serviceObj); updatingMap.put(serviceName, new Object()); //會去拉取這個注冊中心里面的注冊表資訊 updateServiceNow(serviceName, clusters); updatingMap.remove(serviceName); } //如果本地快取里面已有這個注冊表資訊 else if (updatingMap.containsKey(serviceName)) { if (UPDATE_HOLD_INTERVAL > 0) { // hold a moment waiting for update finish synchronized (serviceObj) { try { serviceObj.wait(UPDATE_HOLD_INTERVAL); } catch (InterruptedException e) {...} } } } //客戶端會開啟一個定時任務,每隔幾秒會去拉取注冊中心里面的全部實體的資訊 scheduleUpdateIfAbsent(serviceName, clusters); return serviceInfoMap.get(serviceObj.getKey()); } //HostReactor類# Map<String, ServiceInfo> serviceInfoMap屬性【這個便是客戶端保存實體資料的快取所在】 //實際上是先從serviceInfoMap屬性里面拿的 private ServiceInfo getServiceInfo0(String serviceName, String clusters) { String key = ServiceInfo.getKey(serviceName, clusters); return serviceInfoMap.get(key); }
【6.1.1】分析遠程拉取流程updateServiceNow方法
private void updateServiceNow(String serviceName, String clusters) { try { updateService(serviceName, clusters); } catch (NacosException e) {...} } public void updateService(String serviceName, String clusters) throws NacosException { ServiceInfo oldService = getServiceInfo0(serviceName, clusters); try { //遠程呼叫 String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false); if (StringUtils.isNotEmpty(result)) { //處理并塞入serviceInfoMap,還會發送一個InstancesChangeEvent事件 processServiceJson(result); } } finally { if (oldService != null) { synchronized (oldService) { oldService.notifyAll(); } } } } public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException { final Map<String, String> params = new HashMap<String, String>(8); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put("clusters", clusters); params.put("udpPort", String.valueOf(udpPort)); params.put("clientIP", NetUtils.localIP()); params.put("healthyOnly", String.valueOf(healthyOnly)); //呼叫服務的API,獲取服務注冊中心里面的全部實體 return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET); }
【6.1.1.1】分析定時任務scheduleUpdateIfAbsent方法做了什么
public void scheduleUpdateIfAbsent(String serviceName, String clusters) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } synchronized (futureMap) { if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) { return; } ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters)); futureMap.put(ServiceInfo.getKey(serviceName, clusters), future); } } //DEFAULT_DELAY = 1000L,也就是說是1s public synchronized ScheduledFuture<?> addTask(UpdateTask task) { return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS); } //分析UpdateTask類的run方法 @Override public void run() { long delayTime = DEFAULT_DELAY; try { // 根據serviceName獲取到當前服務的資訊,包括服務器地址串列 ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); // 如果為空,則重新拉取最新的服務串列 if (serviceObj == null) { updateService(serviceName, clusters); return; } // 如果時間戳<=上次更新的時間,則進行更新操作 if (serviceObj.getLastRefTime() <= lastRefTime) { updateService(serviceName, clusters); serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters)); } else { // 如果serviceObj的refTime更晚, // 則表示服務通過主動push機制已被更新,這時我們只進行重繪操作 refreshOnly(serviceName, clusters); } // 重繪服務的更新時間 lastRefTime = serviceObj.getLastRefTime(); // 如果訂閱被取消,則停止更新任務 if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) { return; } // 如果沒有可供呼叫的服務串列,則統計失敗次數+1 if (CollectionUtils.isEmpty(serviceObj.getHosts())) { incFailCount(); return; } // 設定延遲一段時間后進行查詢 delayTime = serviceObj.getCacheMillis(); // 將失敗查詢次數重置為0 resetFailCount(); } catch (Throwable e) { incFailCount(); } finally { // 設定下一次查詢任務的觸發時間 // 默認是1s,按照失敗次數翻倍,最大60s // 也就是【1,2,4,8,16,32,60】 executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS); } }
服務端分析
【1】分析nacos.naming.controllers包下的InstanceController
【1.1】分析注冊方法
//RESTful的介面規范 @CanDistro @PostMapping @Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE) public String register(HttpServletRequest request) throws Exception { // 嘗試獲取namespaceId final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID); // 嘗試獲取serviceName,其格式為 group_name@@service_name final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME); NamingUtils.checkServiceNameFormat(serviceName); // 決議出實體資訊,封裝為Instance物件 final Instance instance = parseInstance(request); // 注冊實體 serviceManager.registerInstance(namespaceId, serviceName, instance); return "ok"; } //ServiceManager類#registerInstance方法 public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException { //判斷本地快取中是否存在該命名空間,如果不存在就創建,之后判斷該命名空間下是否 //存在該服務,如果不存在就創建空的服務 //注意這里并沒有更新服務的實體資訊 createEmptyService(namespaceId, serviceName, instance.isEphemeral()); //從本地快取中獲取服務資訊 Service service = getService(namespaceId, serviceName); if (service == null) { throw new NacosException(...); } //服務注冊,這一步才會把服務的實體資訊和服務系結起來 addInstance(namespaceId, serviceName, instance.isEphemeral(), instance); }
【1.1.1】分析createEmptyService方法
public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException { createServiceIfAbsent(namespaceId, serviceName, local, null); } public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException { Service service = getService(namespaceId, serviceName); //沒有才會去創建 if (service == null) { Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName); service = new Service(); service.setName(serviceName); service.setNamespaceId(namespaceId); service.setGroupName(NamingUtils.getGroupName(serviceName)); // now validate the service. if failed, exception will be thrown service.setLastModifiedMillis(System.currentTimeMillis()); service.recalculateChecksum(); if (cluster != null) { cluster.setService(service); service.getClusterMap().put(cluster.getName(), cluster); } service.validate(); //將創建的空的服務插入快取,并初始化 putServiceAndInit(service); if (!local) { addOrReplaceService(service); } } } //從Map中取出,這個Map的定義 //private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>(); //感覺是不是和eureka的雙重Map存盤很相似 public Service getService(String namespaceId, String serviceName) { if (serviceMap.get(namespaceId) == null) { return null; } return chooseServiceMap(namespaceId).get(serviceName); } private void putServiceAndInit(Service service) throws NacosException { //將服務插入快取 putService(service); //對服務進行初始化 service.init(); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service); consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service); Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson()); } //這里面采用的是雙重檢查+鎖(也就是DCL【Double Check Lock】) public void putService(Service service) { if (!serviceMap.containsKey(service.getNamespaceId())) { synchronized (putServiceLock) { if (!serviceMap.containsKey(service.getNamespaceId())) { serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>()); } } } serviceMap.get(service.getNamespaceId()).put(service.getName(), service); }
【1.1.1.1】分析服務初始化流程【這里面分為兩種,一種是持久實體,一種是臨時實體】【服務心跳】
//初始化程序做了什么 public void init() { //健康檢查的執行緒添加一個心跳任務 HealthCheckReactor.scheduleCheck(clientBeatCheckTask); for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) { entry.getValue().setService(this); entry.getValue().init(); } }
【1.1.1.1.1】持久實體的處理【對于單個cluster】
public void init() { if (inited) { //這樣每個集群都只會開啟一次 return; } // 創建健康檢測的任務 checkTask = new HealthCheckTask(this); // 這里會開啟對 非臨時實體的 定時健康檢測 HealthCheckReactor.scheduleCheck(checkTask); inited = true; } //checkRtNormalized = 2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax())); public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) { task.setStartTime(System.currentTimeMillis()); //也就是延遲2000 + 5000毫秒內的亂數 return GlobalExecutor.scheduleNamingHealth(task, task.getCheckRtNormalized(), TimeUnit.MILLISECONDS); } //HealthCheckTask類#run方法 @Override public void run() { try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) { healthCheckProcessor.process(this); } } catch (Throwable e) {...} finally { if (!cancelled) { // 結束后,再次進行任務調度,一定延遲后執行 HealthCheckReactor.scheduleCheck(this); // worst == 0 means never checked if (this.getCheckRtWorst() > 0 && switchDomain.isHealthCheckEnabled(cluster.getService().getName()) && distroMapper.responsible(cluster.getService().getName())) { // TLog doesn't support float so we must convert it into long long diff = ((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / this.getCheckRtLastLast(); this.setCheckRtLastLast(this.getCheckRtLast()); Cluster cluster = this.getCluster(); } } } } //TcpSuperSenseProcessor類#process方法 @Override public void process(HealthCheckTask task) { //拿出集群的持久實體 List<Instance> ips = task.getCluster().allIPs(false); if (CollectionUtils.isEmpty(ips)) { return; } for (Instance ip : ips) { if (ip.isMarked()) { continue; } if (!ip.markChecking()) { healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams()); continue; } // 封裝健康檢測資訊到 Beat Beat beat = new Beat(ip, task); // 放入一個阻塞佇列中 taskQueue.add(beat); MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet(); } } //又基于他自己本身的建構式 public TcpSuperSenseProcessor() { try { //開啟個執行緒池 selector = Selector.open(); GlobalExecutor.submitTcpCheck(this); } catch (Exception e) { throw new IllegalStateException(...); } } //TcpSuperSenseProcessor類#run方法 @Override public void run() { while (true) { try { processTask(); int readyCount = selector.selectNow(); if (readyCount <= 0) { continue; } Iterator<SelectionKey> iter = selector.selectedKeys().iterator(); while (iter.hasNext()) { SelectionKey key = iter.next(); iter.remove(); GlobalExecutor.executeTcpSuperSense(new PostProcessor(key)); } } catch (Throwable e) {...} } } //TcpSuperSenseProcessor類#processTask方法 private void processTask() throws Exception { Collection<Callable<Void>> tasks = new LinkedList<>(); do { Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS); if (beat == null) { return; } tasks.add(new TaskProcessor(beat)); } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64); // 批量處理集合中的任務 for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) { f.get(); } } private class TaskProcessor implements Callable<Void> { private static final int MAX_WAIT_TIME_MILLISECONDS = 500; Beat beat; public TaskProcessor(Beat beat) { this.beat = beat; } @Override public Void call() { // 獲取檢測任務已經等待的時長 long waited = System.currentTimeMillis() - beat.getStartTime(); if (waited > MAX_WAIT_TIME_MILLISECONDS) {...} SocketChannel channel = null; try { // 獲取實體資訊 Instance instance = beat.getIp(); BeatKey beatKey = keyMap.get(beat.toString()); if (beatKey != null && beatKey.key.isValid()) { if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) { instance.setBeingChecked(false); return null; } beatKey.key.cancel(); beatKey.key.channel().close(); } // 通過NIO建立TCP連接 channel = SocketChannel.open(); channel.configureBlocking(false); // only by setting this can we make the socket close event asynchronous channel.socket().setSoLinger(false, -1); channel.socket().setReuseAddress(true); channel.socket().setKeepAlive(true); channel.socket().setTcpNoDelay(true); Cluster cluster = beat.getTask().getCluster(); int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport(); channel.connect(new InetSocketAddress(instance.getIp(), port)); // 注冊連接、讀取事件 SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ); key.attach(beat); keyMap.put(beat.toString(), new BeatKey(key)); beat.setStartTime(System.currentTimeMillis()); //構建一個延遲500毫秒的延遲 GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS); } catch (Exception e) { beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(), "tcp:error:" + e.getMessage()); if (channel != null) { try { channel.close(); } catch (Exception ignore) { } } } return null; } }
【1.1.1.1.2】臨時實體的處理【對于整個service】
//心跳延遲5s,下一次還是5s public static void scheduleCheck(ClientBeatCheckTask task) { futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS)); } //分析clientBeatCheckTask類,既然是task必然是要研究一下run方法的 //對Service開啟心跳檢測【但是你會發現只是對臨時實體】 @Override public void run() { try {
//hash取模,也就是只允許它在一臺機器上進行檢查 if (!getDistroMapper().responsible(service.getName())) { return; } if (!getSwitchDomain().isHealthCheckEnabled()) { return; } //拿到該服務下面的所有IP【單指臨時實體】 List<Instance> instances = service.allIPs(true); // 當前時間距離上次心跳時間超過15s,則將實體健康改為false for (Instance instance : instances) { //如果沒有設定,默認值DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15); if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } //當前時間距離上次心跳時間超過30s,則將實體從記憶體中洗掉 for (Instance instance : instances) { if (instance.isMarked()) { continue; } //如果沒有設定,默認值DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30); if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance,自己向自己發起洗掉請求 deleteIp(instance); } } } catch (Exception e) {...} }
【1.1.1.1.3】匯總一波
//Nacos的健康檢測有兩種模式: //臨時實體: 采用客戶端心跳檢測模式,心跳周期5秒 心跳間隔超過15秒則標記為不健康 心跳間隔超過30秒則從服務串列洗掉 //永久實體: 采用服務端主動健康檢測方式 周期為2000 + 5000毫秒內的亂數 檢測例外只會標記為不健康,不會洗掉
【1.1.2】分析addInstance注冊方法
public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException { String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral); Service service = getService(namespaceId, serviceName); synchronized (service) { //將注冊表中已經存在的實體與當前注冊過來的實體給合并為list List<Instance> instanceList = addIpAddresses(service, ephemeral, ips); Instances instances = new Instances(); instances.setInstanceList(instanceList); //將上面合并后的實體串列更新到注冊表中 //實作類為DelegateConsistencyServiceImpl consistencyService.put(key, instances); } } //構建KEY,ephemeral默認是true,也就是一般認為都是臨時實體 public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) { return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName) : buildPersistentInstanceListKey(namespaceId, serviceName); } //臨時實體 private static String buildEphemeralInstanceListKey(String namespaceId, String serviceName) { //com.alibaba.nacos.naming.iplist.ephemeral.{namespaceId}##{serviceName} return INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName; } //持久實體 private static String buildPersistentInstanceListKey(String namespaceId, String serviceName) { //com.alibaba.nacos.naming.iplist.{namespaceId}##{serviceName} return INSTANCE_LIST_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName; } private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException { return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips); } public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException { Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral)); List<Instance> currentIPs = service.allIPs(ephemeral); Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size()); Set<String> currentInstanceIds = Sets.newHashSet(); for (Instance instance : currentIPs) { currentInstances.put(instance.toIpAddr(), instance); currentInstanceIds.add(instance.getInstanceId()); } Map<String, Instance> instanceMap; if (datum != null && null != datum.value) { instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances); } else { instanceMap = new HashMap<>(ips.length); } for (Instance instance : ips) { if (!service.getClusterMap().containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), service); cluster.init(); service.getClusterMap().put(instance.getClusterName(), cluster); } //我們這次進來的action是add,并不是remove,所以不走這里 if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) { instanceMap.remove(instance.getDatumKey()); } else { //最后就是實體放到map中,以ip+port等資訊為key,value為當前實體 Instance oldInstance = instanceMap.get(instance.getDatumKey()); if (oldInstance != null) { //覆寫 instance.setInstanceId(oldInstance.getInstanceId()); } else { //新增 instance.setInstanceId(instance.generateInstanceId(currentInstanceIds)); } instanceMap.put(instance.getDatumKey(), instance); } } if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) { throw new IllegalArgumentException(...); } return new ArrayList<>(instanceMap.values()); } //DelegateConsistencyServiceImpl類#put方法 @Override public void put(String key, Record value) throws NacosException { mapConsistencyService(key).put(key, value); } //private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService; //private final EphemeralConsistencyService ephemeralConsistencyService; //實際上的實作是DistroConsistencyServiceImpl //根據key值決定臨時節點的方式【AP架構】還會持久節點的方式【AP架構】 private ConsistencyService mapConsistencyService(String key) { return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService; }
【1.1.2.1】分析臨時節點的方式【AP架構模式】
//DistroConsistencyServiceImpl類#put方法 @Override public void put(String key, Record value) throws NacosException { //添加任務以及添加到記憶體中 onPut(key, value); //同步資料到所有其他節點 distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2); } public void onPut(String key, Record value) { if (KeyBuilder.matchEphemeralInstanceListKey(key)) { Datum<Instances> datum = new Datum<>(); //當前服務的所有實體 datum.value =https://www.cnblogs.com/chafry/archive/2022/10/26/ (Instances) value; datum.key = key; datum.timestamp.incrementAndGet(); //添加到dataStore里面,實際上是放入里面的dataMap里面 dataStore.put(key, datum); } if (!listeners.containsKey(key)) { return; } //添加異步任務 notifier.addTask(key, DataOperation.CHANGE); } //分析notifier類,既然繼承了Runnable介面必然有run方法
//GlobalExecutor.submitDistroNotifyTask(notifier); 在DistroConsistencyServiceImpl類中初始化,默認單執行緒 public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } //存入阻塞佇列里面 tasks.offer(Pair.with(datumKey, action)); } public int getTaskSize() { return tasks.size(); } @Override public void run() { //死回圈不斷處理佇列的資料 for (; ; ) { try { //從佇列里面拿出資料進行處理 Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) {..} } } private void handle(Pair<String, DataOperation> pair) { try { //將資料還原 String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { //會呼叫Service類#onChange方法 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) {...} } } catch (Throwable e) {...} } }
【1.1.2.1.1】分析Service類#onChange方法
//Service類#onChange方法 @Override public void onChange(String key, Instances value) throws Exception { //對權重的處理,不太重要 for (Instance instance : value.getInstanceList()) { if (instance == null) { // Reject this abnormal instance list: throw new RuntimeException("got null instance " + key); } if (instance.getWeight() > 10000.0D) { instance.setWeight(10000.0D); } if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) { instance.setWeight(0.01D); } } //注冊邏輯在這里 updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key)); //生成校驗比對的哈希值碼 recalculateChecksum(); }
【1.1.2.1.1.1】分析注冊邏輯
//更新注冊表,這里采用了寫時復制思想:即將注冊表拷貝一個副本出來,更新這個副本,但是服務發現的時候還是從注冊表里獲取,待全部更新完畢再將副本替換回注冊表中,這樣就避免了注冊表的讀寫并發問題,這種方式不用加鎖,從而大大提升了性能 //真正的注冊邏輯 public void updateIPs(Collection<Instance> instances, boolean ephemeral) { Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size()); for (String clusterName : clusterMap.keySet()) { ipMap.put(clusterName, new ArrayList<>()); } for (Instance instance : instances) { try { if (instance == null) { continue; } if (StringUtils.isEmpty(instance.getClusterName())) { instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME); } if (!clusterMap.containsKey(instance.getClusterName())) { Cluster cluster = new Cluster(instance.getClusterName(), this); cluster.init(); getClusterMap().put(instance.getClusterName(), cluster); } List<Instance> clusterIPs = ipMap.get(instance.getClusterName()); if (clusterIPs == null) { clusterIPs = new LinkedList<>(); ipMap.put(instance.getClusterName(), clusterIPs); } clusterIPs.add(instance); } catch (Exception e) {...} } for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) { //make every ip mine List<Instance> entryIPs = entry.getValue(); //在這里進行注冊 clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral); } setLastModifiedMillis(System.currentTimeMillis());
//發送監聽事件 getPushService().serviceChanged(this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } } public void updateIps(List<Instance> ips, boolean ephemeral) { Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } //進行新舊合并 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) {...} if (ip.getWeight() != oldIP.getWeight()) {...} } } List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); //寫入注冊表的cluster里面的串列,采用參考替換 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } } //新舊合并形成新的串列 private List<Instance> updatedIps(Collection<Instance> newInstance, Collection<Instance> oldInstance) { List<Instance> intersects = (List<Instance>) CollectionUtils.intersection(newInstance, oldInstance); Map<String, Instance> stringIpAddressMap = new ConcurrentHashMap<>(intersects.size()); for (Instance instance : intersects) { stringIpAddressMap.put(instance.getIp() + ":" + instance.getPort(), instance); } Map<String, Integer> intersectMap = new ConcurrentHashMap<>(newInstance.size() + oldInstance.size()); Map<String, Instance> updatedInstancesMap = new ConcurrentHashMap<>(newInstance.size()); Map<String, Instance> newInstancesMap = new ConcurrentHashMap<>(newInstance.size()); for (Instance instance : oldInstance) { if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) { intersectMap.put(instance.toString(), 1); } } for (Instance instance : newInstance) { if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) { if (intersectMap.containsKey(instance.toString())) { intersectMap.put(instance.toString(), 2); } else { intersectMap.put(instance.toString(), 1); } } newInstancesMap.put(instance.toString(), instance); } for (Map.Entry<String, Integer> entry : intersectMap.entrySet()) { String key = entry.getKey(); Integer value = entry.getValue(); if (value =https://www.cnblogs.com/chafry/archive/2022/10/26/= 1) { if (newInstancesMap.containsKey(key)) { updatedInstancesMap.put(key, newInstancesMap.get(key)); } } } return new ArrayList<>(updatedInstancesMap.values()); }
【1.1.2.1.1.1.1】分析getPushService().serviceChanged(this);發送的事件是如何推送給客戶端的
//發送的事件 public void serviceChanged(Service service) { // merge some change events to reduce the push frequency: if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) { return; } this.applicationContext.publishEvent(new ServiceChangeEvent(this, service)); } //使用IDEA的全文搜索,ctrl+shift+F,找到對應onApplicationEvent(ServiceChangeEvent event) //PushService類#onApplicationEvent方法 @Override public void onApplicationEvent(ServiceChangeEvent event) { Service service = event.getService(); String serviceName = service.getName(); String namespaceId = service.getNamespaceId(); Future future = GlobalExecutor.scheduleUdpSender(() -> { try { Loggers.PUSH.info(serviceName + " is changed, add it to push queue."); ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); if (MapUtils.isEmpty(clients)) { return; } Map<String, Object> cache = new HashMap<>(16); long lastRefTime = System.nanoTime(); for (PushClient client : clients.values()) { if (client.zombie()) { Loggers.PUSH.debug("client is zombie: " + client.toString()); clients.remove(client.toString()); Loggers.PUSH.debug("client is zombie: " + client.toString()); continue; } Receiver.AckEntry ackEntry; Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString()); String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent()); byte[] compressData = https://www.cnblogs.com/chafry/archive/2022/10/26/null; Map<String, Object> data = https://www.cnblogs.com/chafry/archive/2022/10/26/null; if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) { org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key); compressData = (byte[]) (pair.getValue0()); data = (Map<String, Object>) pair.getValue1(); Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr()); } if (compressData != null) { ackEntry = prepareAckEntry(client, compressData, data, lastRefTime); } else { ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime); if (ackEntry != null) { cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data)); } } //發起請求 udpPush(ackEntry); } } catch (Exception e) {...} finally { futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName)); } }, 1000, TimeUnit.MILLISECONDS); futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future); } private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) { if (ackEntry == null) { Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null."); return null; } if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) { Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key); ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return ackEntry; } try { if (!ackMap.containsKey(ackEntry.key)) { totalPush++; } ackMap.put(ackEntry.key, ackEntry); udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis()); Loggers.PUSH.info("send udp packet: " + ackEntry.key); //UDP發送 udpSocket.send(ackEntry.origin); ackEntry.increaseRetryTime(); GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS); return ackEntry; } catch (Exception e) { ackMap.remove(ackEntry.key); udpSendTimeMap.remove(ackEntry.key); failedPush += 1; return null; } }
【1.1.2.1.1.2】分析recalculateChecksum方法如何生成比對的哈希值碼
//有了解過eureka的增量更新便應該知道,如何知道你自己拉取的資料全不全,就是靠比對這個哈希值碼 //如果服務端上的是【ABCDEF】,而你本地的是【ABCDF】,那么哈希值碼不一樣說明資料缺失了, public synchronized void recalculateChecksum() { List<Instance> ips = allIPs(); StringBuilder ipsString = new StringBuilder(); ipsString.append(getServiceString()); if (Loggers.SRV_LOG.isDebugEnabled()) { Loggers.SRV_LOG.debug("service to json: " + getServiceString()); } if (CollectionUtils.isNotEmpty(ips)) { Collections.sort(ips); } for (Instance ip : ips) { String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_" + ip.isHealthy() + "_" + ip.getClusterName(); ipsString.append(string); ipsString.append(","); } checksum = MD5Utils.md5Hex(ipsString.toString(), Constants.ENCODE); }
【1.1.2.1.2】分析集群服務新增資料同步的方法 distroProtocol.sync:
//DistroProtocol類#sync方法 public void sync(DistroKey distroKey, DataOperation action, long delay) { for (Member each : memberManager.allMembersWithoutSelf()) { DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress()); DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay); //添加任務,采用異步的方式,會有重試功能 distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask); } }
【1.1.2.2】分析持久節點的方式(這塊還有部分沒搞懂,后面補上)
【1.2】分析服務發現的呼叫
@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
NamingUtils.checkServiceNameFormat(serviceName);
String agent = WebUtils.getUserAgent(request);
String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
//上面進行引數校驗,這里開始
return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}
//查看是如何獲取服務進行回傳的
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
ClientInfo clientInfo = new ClientInfo(agent);
ObjectNode result = JacksonUtils.createEmptyJsonNode();
Service service = serviceManager.getService(namespaceId, serviceName);
long cacheMillis = switchDomain.getDefaultCacheMillis();
// now try to enable the push
try {
//判斷是否支持UDP方式推送,不重要
if (udpPort > 0 && pushService.canEnablePush(agent)) {
pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app);
cacheMillis = switchDomain.getPushCacheMillis(serviceName);
}
} catch (Exception e) {
cacheMillis = switchDomain.getDefaultCacheMillis();
}
if (service == null) {
result.put("name", serviceName);
result.put("clusters", clusters);
result.put("cacheMillis", cacheMillis);
result.replace("hosts", JacksonUtils.createEmptyArrayNode());
return result;
}
checkIfDisabled(service);
List<Instance> srvedIPs;
//主要是在這里獲取
srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
//然后下面主要就是塞資料然后回傳
// filter ips using selector:
if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
srvedIPs = service.getSelector().select(clientIP, srvedIPs);
}
if (CollectionUtils.isEmpty(srvedIPs)) {
if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.set("hosts", JacksonUtils.createEmptyArrayNode());
result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
ipMap.put(Boolean.TRUE, new ArrayList<>());
ipMap.put(Boolean.FALSE, new ArrayList<>());
for (Instance ip : srvedIPs) {
ipMap.get(ip.isHealthy()).add(ip);
}
if (isCheck) {
result.put("reachProtectThreshold", false);
}
double threshold = service.getProtectThreshold();
if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
if (isCheck) {
result.put("reachProtectThreshold", true);
}
ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
ipMap.get(Boolean.FALSE).clear();
}
if (isCheck) {
result.put("protectThreshold", service.getProtectThreshold());
result.put("reachLocalSiteCallThreshold", false);
return JacksonUtils.createEmptyJsonNode();
}
ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
List<Instance> ips = entry.getValue();
if (healthyOnly && !entry.getKey()) {
continue;
}
for (Instance instance : ips) {
// remove disabled instance:
if (!instance.isEnabled()) {
continue;
}
ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
ipObj.put("ip", instance.getIp());
ipObj.put("port", instance.getPort());
// deprecated since nacos 1.0.0:
ipObj.put("valid", entry.getKey());
ipObj.put("healthy", entry.getKey());
ipObj.put("marked", instance.isMarked());
ipObj.put("instanceId", instance.getInstanceId());
ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
ipObj.put("enabled", instance.isEnabled());
ipObj.put("weight", instance.getWeight());
ipObj.put("clusterName", instance.getClusterName());
if (clientInfo.type == ClientInfo.ClientType.JAVA
&& clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
ipObj.put("serviceName", instance.getServiceName());
} else {
ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
}
ipObj.put("ephemeral", instance.isEphemeral());
hosts.add(ipObj);
}
}
result.replace("hosts", hosts);
if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
result.put("dom", serviceName);
} else {
result.put("dom", NamingUtils.getServiceName(serviceName));
}
result.put("name", serviceName);
result.put("cacheMillis", cacheMillis);
result.put("lastRefTime", System.currentTimeMillis());
result.put("checksum", service.getChecksum());
result.put("useSpecifiedURL", false);
result.put("clusters", clusters);
result.put("env", env);
result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
return result;
}
【1.2.1】分析service.srvIPs方法在服務發現的如何獲取所有實體
public List<Instance> srvIPs(List<String> clusters) { if (CollectionUtils.isEmpty(clusters)) { clusters = new ArrayList<>(); clusters.addAll(clusterMap.keySet()); } return allIPs(clusters); } public List<Instance> allIPs(List<String> clusters) { List<Instance> result = new ArrayList<>(); for (String cluster : clusters) { Cluster clusterObj = clusterMap.get(cluster); if (clusterObj == null) { continue; } //將臨時實體和持久實體一起回傳 result.addAll(clusterObj.allIPs()); } return result; } public List<Instance> allIPs() { List<Instance> allInstances = new ArrayList<>(); allInstances.addAll(persistentInstances); allInstances.addAll(ephemeralInstances); return allInstances; }
【2】集群情況下
【2.1】集群節點狀態同步任務
@Component("serverListManager") //自動被掃描成為Bean
public class ServerListManager extends MemberChangeListener {
public ServerListManager(final SwitchDomain switchDomain, final ServerMemberManager memberManager) {
this.switchDomain = switchDomain;
this.memberManager = memberManager;
NotifyCenter.registerSubscriber(this);
this.servers = new ArrayList<>(memberManager.allMembers());
}
//初始化時候自動呼叫
@PostConstruct
public void init() {
//注冊兩個任務的方法
//集群節點狀態同步任務
GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
//本地服務更新任務
GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
}
}
private class ServerStatusReporter implements Runnable {
@Override
public void run() {
try {
if (EnvUtil.getPort() <= 0) {
return;
}
int weight = Runtime.getRuntime().availableProcessors() / 2;
if (weight <= 0) {
weight = 1;
}
long curTime = System.currentTimeMillis();
String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight
+ "\r\n";
//獲取所有節點
List<Member> allServers = getServers();
if (!contains(EnvUtil.getLocalAddress())) {
Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",
EnvUtil.getLocalAddress(), allServers);
return;
}
//遍歷
if (allServers.size() > 0 && !EnvUtil.getLocalAddress().contains(IPUtil.localHostIP())) {
for (Member server : allServers) {
//排除自己
if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {
continue;
}
// This metadata information exists from 1.3.0 onwards "version"
if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
continue;
}
Message msg = new Message();
msg.setData(status);
//向介面/nacos/v1/ns/operator/server/status發送資料
synchronizer.send(server.getAddress(), msg);
}
}
} catch (Exception e) {...} finally {
//TimeUnit.SECONDS.toMillis(2),也就是每2s一次
GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
}
}
}
【2.2】注冊服務實體資訊在集群節點間同步任務
//ServiceManager類#初始化方法 @PostConstruct public void init() { //每分鐘同步一次 GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS); GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor()); if (emptyServiceAutoClean) { // delay 60s, period 20s; // This task is not recommended to be performed frequently in order to avoid // the possibility that the service cache information may just be deleted // and then created due to the heartbeat mechanism GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay, cleanEmptyServicePeriod); } try { consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this); } catch (NacosException e) {...} } //同步服務的健康狀態 private class ServiceReporter implements Runnable { @Override public void run() { try { Map<String, Set<String>> allServiceNames = getAllServiceNames(); if (allServiceNames.size() <= 0) { //ignore return; } for (String namespaceId : allServiceNames.keySet()) { ServiceChecksum checksum = new ServiceChecksum(namespaceId); for (String serviceName : allServiceNames.get(namespaceId)) { if (!distroMapper.responsible(serviceName)) { continue; } Service service = getService(namespaceId, serviceName); if (service == null || service.isEmpty()) { continue; } service.recalculateChecksum(); checksum.addItem(serviceName, service.getChecksum()); } Message msg = new Message(); msg.setData(JacksonUtils.toJson(checksum)); Collection<Member> sameSiteServers = memberManager.allMembers(); if (sameSiteServers == null || sameSiteServers.size() <= 0) { return; } for (Member server : sameSiteServers) { if (server.getAddress().equals(NetUtils.localServer())) { continue; } synchronizer.send(server.getAddress(), msg); } } } catch (Exception e) {...} finally { GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS); } } }
Nacos服務注冊表結構:Map<namespace, map<group::servicename,="" service="">>
結構展示

示例展示

Nacos核心功能原始碼架構

ConcurrentSkipListMap法
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/520612.html
標籤:其他
上一篇:SpringCloud(二) - Eureka注冊中心,feign遠程呼叫,hystrix降級和熔斷
下一篇:Scala-泛型
