主頁 > 後端開發 > 微服務組件-----Spring Cloud Alibaba 注冊中心 Nacos原始碼(1.4.x版本)分析

微服務組件-----Spring Cloud Alibaba 注冊中心 Nacos原始碼(1.4.x版本)分析

2022-10-27 06:32:46 後端開發

 

核心功能點

【1】服務注冊:Nacos Client會通過發送REST請求的方式向Nacos Server注冊自己的服務,提供自身的元資料,比如ip地址、埠等資訊,Nacos Server接收到注冊請求后,就會把這些元資料資訊存盤在一個雙層的記憶體Map中,

【2】服務心跳:在服務注冊后,Nacos Client會維護一個定時心跳來持續通知Nacos Server,說明服務一直處于可用狀態,防止被剔除,默認5s發送一次心跳

【3】服務同步:Nacos Server集群之間會互相同步服務實體,用來保證服務資訊的一致性,  

【4】服務發現:服務消費者(Nacos Client)在呼叫服務提供者的服務時,會發送一個REST請求給Nacos Server,獲取上面注冊的服務清單,并且快取在Nacos Client本地,同時會在Nacos Client本地開啟一個定時任務定時拉取服務端最新的注冊表資訊更新到本地快取

【5】服務健康檢查:Nacos Server會開啟一個定時任務用來檢查注冊服務實體的健康情況,對于超過15s沒有收到客戶端心跳的實體會將它的healthy屬性置為false(客戶端服務發現時不會發現),如果某個實體超過30秒沒有收到心跳,直接剔除該實體(被剔除的實體如果恢復發送心跳則會重新注冊)

 

原始碼精髓總結

【1】注冊表的結構說明(這個僅是記錄):

//Map<namespaceId, Map<service_name, Service>【ConcurrentSkipListMap】>
private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
//再分析里面的Service,Map<clusterName, Cluster>
private Map<String, Cluster> clusterMap = new HashMap<>();
//再分析Cluster
private Set<Instance> persistentInstances = new HashSet<>();
private Set<Instance> ephemeralInstances = new HashSet<>();

【2】分析注冊表為何要這么設計

1.注冊表是基于第一層ConcurrentHashMap,第二層ConcurrentSkipListMap,第三層HashMap,然后定位到對應的Cluster,
2.至于為什么要這樣設計,一方面是將粒度劃分的更細,通過原始碼分析可知,nacos更新注冊表是進行小范圍的更新,如定位到Cluster的臨時串列ephemeralInstances或者持久串列persistentInstances【這兩個都是set集合,所以排除了會有重復的資料】,因為粒度小所以更新速度會更快,
3.其次采用的是 寫時復制思想,也就是說,不會影響讀取的效率,因為是新開一個副本,將新舊的資料合并到一個新資料里面,然后將參考指向新資料,
4.其次是為了高擴展,對namespace進行劃分【對開發環境隔離】,對service進行劃分【對服務進行隔離】,對Cluster進行劃分【多機房部署,加快訪問速度】
5.為了解決并發讀寫問題,采用的是ConcurrentHashMap與ConcurrentSkipListMap的分段鎖,加上Cluster里面的寫時復制,其次Cluster里面是不加鎖的,因為是單執行緒進行修改,不存在沖突,
6.雖說犧牲了,一定的實時性,但是大大提高了并發的性能,

【3】分析AP架構下為什么高性能的原因

1.因為采用的是異步任務加佇列的形式來實作注冊的,所以回應很快,然后任務是慢慢做的,
2.Notifier 是在DistroConsistencyServiceImpl類中初始化,默認單執行緒,而且佇列為ArrayBlockingQueue<>(1024 * 1024),
3.縮小了變更資料的粒度,單執行緒避免了執行緒安全問題【不用加鎖】,
4.這種方式毫無疑問是會存在問題的,就是回應了但是沒有注冊上,但是對于這個問題,在客戶端里面做了心跳機制,如果檢測不到會重新注冊,

 【4】分析Nacos為什么感知快的原因

采用的是客戶端定時進行一次拉取,兼服務端采用異步的形式使用UDP發送更新的資料到客戶端;
雖然UDP存在通知丟失的情況,但是每隔1s的拉取依舊能很好的保持資料的最終一致性,

 

原始碼分析

驗證服務端

【1】在啟動的時候我們一般是呼叫shell腳本啟動,查看startup.sh腳本

  從以下看實際上是呼叫了java命令啟動了個java的專案(-jar ${BASE_DIR}/target/${SERVER}.jar 將引數對應替換后 -jar ${BASE_DIR}/target/nacos-server.jar)

  去尋找啟動入口的時候會發現,它其實是SpringBoot搭建的一個WEB服務,

cygwin=false
darwin=false
os400=false
case "`uname`" in
CYGWIN*) cygwin=true;;
Darwin*) darwin=true;;
OS400*) os400=true;;
esac
error_exit ()
{
    echo "ERROR: $1 !!"
    exit 1
}
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=$HOME/jdk/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/usr/java
[ ! -e "$JAVA_HOME/bin/java" ] && JAVA_HOME=/opt/taobao/java
[ ! -e "$JAVA_HOME/bin/java" ] && unset JAVA_HOME

if [ -z "$JAVA_HOME" ]; then
  if $darwin; then

    if [ -x '/usr/libexec/java_home' ] ; then
      export JAVA_HOME=`/usr/libexec/java_home`

    elif [ -d "/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home" ]; then
      export JAVA_HOME="/System/Library/Frameworks/JavaVM.framework/Versions/CurrentJDK/Home"
    fi
  else
    JAVA_PATH=`dirname $(readlink -f $(which javac))`
    if [ "x$JAVA_PATH" != "x" ]; then
      export JAVA_HOME=`dirname $JAVA_PATH 2>/dev/null`
    fi
  fi
  if [ -z "$JAVA_HOME" ]; then
        error_exit "Please set the JAVA_HOME variable in your environment, We need java(x64)! jdk8 or later is better!"
  fi
fi

export SERVER="nacos-server"
export MODE="cluster"
export FUNCTION_MODE="all"
export MEMBER_LIST=""
export EMBEDDED_STORAGE=""
while getopts ":m:f:s:c:p:" opt
do
    case $opt in
        m)
            MODE=$OPTARG;;
        f)
            FUNCTION_MODE=$OPTARG;;
        s)
            SERVER=$OPTARG;;
        c)
            MEMBER_LIST=$OPTARG;;
        p)
            EMBEDDED_STORAGE=$OPTARG;;
        ?)
        echo "Unknown parameter"
        exit 1;;
    esac
done

export JAVA_HOME
export JAVA="$JAVA_HOME/bin/java"
export BASE_DIR=`cd $(dirname $0)/..; pwd`
export CUSTOM_SEARCH_LOCATIONS=file:${BASE_DIR}/conf/

#===========================================================================================
# JVM Configuration
#===========================================================================================
if [[ "${MODE}" == "standalone" ]]; then
    JAVA_OPT="${JAVA_OPT} -Xms512m -Xmx512m -Xmn256m"
    JAVA_OPT="${JAVA_OPT} -Dnacos.standalone=true"
else
    if [[ "${EMBEDDED_STORAGE}" == "embedded" ]]; then
        JAVA_OPT="${JAVA_OPT} -DembeddedStorage=true"
    fi
    JAVA_OPT="${JAVA_OPT} -server -Xms2g -Xmx2g -Xmn1g -XX:MetaspaceSize=128m -XX:MaxMetaspaceSize=320m"
    JAVA_OPT="${JAVA_OPT} -XX:-OmitStackTraceInFastThrow -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=${BASE_DIR}/logs/java_heapdump.hprof"
    JAVA_OPT="${JAVA_OPT} -XX:-UseLargePages"

fi

if [[ "${FUNCTION_MODE}" == "config" ]]; then
    JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=config"
elif [[ "${FUNCTION_MODE}" == "naming" ]]; then
    JAVA_OPT="${JAVA_OPT} -Dnacos.functionMode=naming"
fi

JAVA_OPT="${JAVA_OPT} -Dnacos.member.list=${MEMBER_LIST}"

JAVA_MAJOR_VERSION=$($JAVA -version 2>&1 | sed -E -n 's/.* version "([0-9]*).*$/\1/p')
if [[ "$JAVA_MAJOR_VERSION" -ge "9" ]] ; then
  JAVA_OPT="${JAVA_OPT} -Xlog:gc*:file=${BASE_DIR}/logs/nacos_gc.log:time,tags:filecount=10,filesize=102400"
else
  JAVA_OPT="${JAVA_OPT} -Djava.ext.dirs=${JAVA_HOME}/jre/lib/ext:${JAVA_HOME}/lib/ext"
  JAVA_OPT="${JAVA_OPT} -Xloggc:${BASE_DIR}/logs/nacos_gc.log -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintGCTimeStamps -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=10 -XX:GCLogFileSize=100M"
fi

JAVA_OPT="${JAVA_OPT} -Dloader.path=${BASE_DIR}/plugins/health,${BASE_DIR}/plugins/cmdb"
JAVA_OPT="${JAVA_OPT} -Dnacos.home=${BASE_DIR}"
JAVA_OPT="${JAVA_OPT} -jar ${BASE_DIR}/target/${SERVER}.jar"
JAVA_OPT="${JAVA_OPT} ${JAVA_OPT_EXT}"
JAVA_OPT="${JAVA_OPT} --spring.config.additional-location=${CUSTOM_SEARCH_LOCATIONS}"
JAVA_OPT="${JAVA_OPT} --logging.config=${BASE_DIR}/conf/nacos-logback.xml"
JAVA_OPT="${JAVA_OPT} --server.max-http-header-size=524288"

if [ ! -d "${BASE_DIR}/logs" ]; then
  mkdir ${BASE_DIR}/logs
fi

echo "$JAVA ${JAVA_OPT}"

if [[ "${MODE}" == "standalone" ]]; then
    echo "nacos is starting with standalone"
else
    echo "nacos is starting with cluster"
fi

# check the start.out log output file
if [ ! -f "${BASE_DIR}/logs/start.out" ]; then
  touch "${BASE_DIR}/logs/start.out"
fi
# start
echo "$JAVA ${JAVA_OPT}" > ${BASE_DIR}/logs/start.out 2>&1 &
nohup $JAVA ${JAVA_OPT} nacos.nacos >> ${BASE_DIR}/logs/start.out 2>&1 &
echo "nacos is starting,you can check the ${BASE_DIR}/logs/start.out"

 

從客戶端開始分析

【1】根據自動裝配原理(尋找spring.factories檔案配置)

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  com.alibaba.cloud.nacos.NacosDiscoveryAutoConfiguration,\
  com.alibaba.cloud.nacos.ribbon.RibbonNacosAutoConfiguration,\
  com.alibaba.cloud.nacos.endpoint.NacosDiscoveryEndpointAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.NacosDiscoveryClientAutoConfiguration,\
  com.alibaba.cloud.nacos.discovery.configclient.NacosConfigServerAutoConfiguration
org.springframework.cloud.bootstrap.BootstrapConfiguration=\
  com.alibaba.cloud.nacos.discovery.configclient.NacosDiscoveryClientConfigServiceBootstrapConfiguration

【2】分析NacosDiscoveryAutoConfiguration類自動裝配了什么

@Configuration
@EnableConfigurationProperties
@ConditionalOnNacosDiscoveryEnabled
@ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
@AutoConfigureAfter({ AutoServiceRegistrationConfiguration.class, AutoServiceRegistrationAutoConfiguration.class })
public class NacosDiscoveryAutoConfiguration {

    @Bean
    public NacosServiceRegistry nacosServiceRegistry(
            NacosDiscoveryProperties nacosDiscoveryProperties) {
        return new NacosServiceRegistry(nacosDiscoveryProperties);
    }

    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosRegistration nacosRegistration(
            NacosDiscoveryProperties nacosDiscoveryProperties,
            ApplicationContext context) {
        return new NacosRegistration(nacosDiscoveryProperties, context);
    }

    //可以看出是將上面兩個Bean當做引數傳入了這個Bean
    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    public NacosAutoServiceRegistration nacosAutoServiceRegistration(
            NacosServiceRegistry registry,
            AutoServiceRegistrationProperties autoServiceRegistrationProperties,
            NacosRegistration registration) {
        return new NacosAutoServiceRegistration(registry, autoServiceRegistrationProperties, registration);
    }
}

 

 【3】分析NacosAutoServiceRegistration類有什么重要性

  利用監聽機制,達到注冊服務的目的,監聽WebServer初始化事件

//class NacosAutoServiceRegistration extends AbstractAutoServiceRegistration<Registration>
//abstract class AbstractAutoServiceRegistration<R extends Registration> implements AutoServiceRegistration, ApplicationContextAware, ApplicationListener<WebServerInitializedEvent>
//因為繼承了ApplicationListener,必然會有監聽方法
public void onApplicationEvent(WebServerInitializedEvent event) {
    bind(event);
}

@Deprecated
public void bind(WebServerInitializedEvent event) {
    ApplicationContext context = event.getApplicationContext();
    if (context instanceof ConfigurableWebServerApplicationContext) {
        if ("management".equals(((ConfigurableWebServerApplicationContext) context).getServerNamespace())) {
            return;
        }
    }
    this.port.compareAndSet(0, event.getWebServer().getPort());
    this.start();
}

public void start() {
    if (!isEnabled()) {return;
    }

    // only initialize if nonSecurePort is greater than 0 and it isn't already running
    // because of containerPortInitializer below
    if (!this.running.get()) {
        this.context.publishEvent(new InstancePreRegisteredEvent(this, getRegistration()));
        register();
        if (shouldRegisterManagement()) {
            registerManagement();
        }
        this.context.publishEvent(new InstanceRegisteredEvent<>(this, getConfiguration()));
        this.running.compareAndSet(false, true);
    }

}

protected void register() {
    this.serviceRegistry.register(getRegistration());
}

@Override
public void register(Registration registration) {
    if (StringUtils.isEmpty(registration.getServiceId())) {
        return;
    }

    NamingService namingService = namingService();
    String serviceId = registration.getServiceId();
    String group = nacosDiscoveryProperties.getGroup();

    Instance instance = getNacosInstanceFromRegistration(registration);

    try {
        namingService.registerInstance(serviceId, group, instance);
    }
    catch (Exception e) {
        // rethrow a RuntimeException if the registration is failed.
        // issue : https://github.com/alibaba/spring-cloud-alibaba/issues/1132
        rethrowRuntimeException(e);
    }
}

 

【4】分析如何注冊的【服務注冊

//NacosNamingService類的registerInstance方法
@Override
public void registerInstance(String serviceName, Instance instance) throws NacosException {
    registerInstance(serviceName, Constants.DEFAULT_GROUP, instance);
}

//NacosNamingService類#registerInstance方法
@Override
public void registerInstance(String serviceName, String groupName, Instance instance) throws NacosException {
    NamingUtils.checkInstanceIsLegal(instance);
    String groupedServiceName = NamingUtils.getGroupedName(serviceName, groupName);
    if (instance.isEphemeral()) {
        BeatInfo beatInfo = beatReactor.buildBeatInfo(groupedServiceName, instance);
     //添加一個延時執行的心跳任務 beatReactor.addBeatInfo(groupedServiceName, beatInfo); }
   //進行服務注冊 serverProxy.registerService(groupedServiceName, groupName, instance); }
//NamingProxy類#registerService方法 public void registerService(String serviceName, String groupName, Instance instance) throws NacosException { //構建注冊引數 final Map<String, String> params = new HashMap<String, String>(16); params.put(CommonParams.NAMESPACE_ID, namespaceId); params.put(CommonParams.SERVICE_NAME, serviceName); params.put(CommonParams.GROUP_NAME, groupName); params.put(CommonParams.CLUSTER_NAME, instance.getClusterName()); params.put("ip", instance.getIp()); params.put("port", String.valueOf(instance.getPort())); params.put("weight", String.valueOf(instance.getWeight())); params.put("enable", String.valueOf(instance.isEnabled())); params.put("healthy", String.valueOf(instance.isHealthy())); params.put("ephemeral", String.valueOf(instance.isEphemeral())); params.put("metadata", JacksonUtils.toJson(instance.getMetadata())); //向服務端發送請求 //UtilAndComs.nacosUrlInstance=/nacos/v1/ns/instance 也就是官網所示的注冊介面地址 reqApi(UtilAndComs.nacosUrlInstance, params, HttpMethod.POST); } public String reqApi(String api, Map<String, String> params, String method) throws NacosException { return reqApi(api, params, Collections.EMPTY_MAP, method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, String method) throws NacosException { return reqApi(api, params, body, getServerList(), method); } public String reqApi(String api, Map<String, String> params, Map<String, String> body, List<String> servers, String method) throws NacosException { params.put(CommonParams.NAMESPACE_ID, getNamespaceId()); if (CollectionUtils.isEmpty(servers) && StringUtils.isBlank(nacosDomain)) { throw new NacosException(...); } NacosException exception = new NacosException(); if (StringUtils.isNotBlank(nacosDomain)) { for (int i = 0; i < maxRetry; i++) { try { return callServer(api, params, body, nacosDomain, method); } catch (NacosException e) { exception = e; } } } else { Random random = new Random(System.currentTimeMillis()); int index = random.nextInt(servers.size()); for (int i = 0; i < servers.size(); i++) { String server = servers.get(index); try { return callServer(api, params, body, server, method); } catch (NacosException e) { exception = e; } index = (index + 1) % servers.size(); } } throw new NacosException(...); } public String callServer(String api, Map<String, String> params, Map<String, String> body, String curServer, String method) throws NacosException { long start = System.currentTimeMillis(); long end = 0; injectSecurityInfo(params); Header header = builderHeader(); String url; if (curServer.startsWith(UtilAndComs.HTTPS) || curServer.startsWith(UtilAndComs.HTTP)) { url = curServer + api; } else { if (!IPUtil.containsPort(curServer)) { curServer = curServer + IPUtil.IP_PORT_SPLITER + serverPort; } url = NamingHttpClientManager.getInstance().getPrefix() + curServer + api; } try { //真正遠程呼叫 HttpRestResult<String> restResult = nacosRestTemplate .exchangeForm(url, header, Query.newInstance().initParams(params), body, method, String.class); end = System.currentTimeMillis(); MetricsMonitor.getNamingRequestMonitor(method, url, String.valueOf(restResult.getCode())).observe(end - start); if (restResult.ok()) { return restResult.getData(); } if (HttpStatus.SC_NOT_MODIFIED == restResult.getCode()) { return StringUtils.EMPTY; } throw new NacosException(restResult.getCode(), restResult.getMessage()); } catch (Exception e) { throw new NacosException(NacosException.SERVER_ERROR, e); } } public <T> HttpRestResult<T> exchangeForm(String url, Header header, Query query, Map<String, String> bodyValues, String httpMethod, Type responseType) throws Exception { RequestHttpEntity requestHttpEntity = new RequestHttpEntity( header.setContentType(MediaType.APPLICATION_FORM_URLENCODED), query, bodyValues); return execute(url, httpMethod, requestHttpEntity, responseType); } private <T> HttpRestResult<T> execute(String url, String httpMethod, RequestHttpEntity requestEntity, Type responseType) throws Exception { URI uri = HttpUtils.buildUri(url, requestEntity.getQuery()); ResponseHandler<T> responseHandler = super.selectResponseHandler(responseType); HttpClientResponse response = null; try { //使用JdkHttpClientRequest去發起請求 response = this.requestClient().execute(uri, httpMethod, requestEntity); return responseHandler.handle(response); } finally { if (response != null) { response.close(); } } } //JdkHttpClientRequest類#execute方法 @Override public HttpClientResponse execute(URI uri, String httpMethod, RequestHttpEntity requestHttpEntity) throws Exception { final Object body = requestHttpEntity.getBody(); final Header headers = requestHttpEntity.getHeaders(); replaceDefaultConfig(requestHttpEntity.getHttpClientConfig()); HttpURLConnection conn = (HttpURLConnection) uri.toURL().openConnection(); Map<String, String> headerMap = headers.getHeader(); if (headerMap != null && headerMap.size() > 0) { for (Map.Entry<String, String> entry : headerMap.entrySet()) { conn.setRequestProperty(entry.getKey(), entry.getValue()); } } conn.setConnectTimeout(this.httpClientConfig.getConTimeOutMillis()); conn.setReadTimeout(this.httpClientConfig.getReadTimeOutMillis()); conn.setRequestMethod(httpMethod); if (body != null && !"".equals(body)) { String contentType = headers.getValue(HttpHeaderConsts.CONTENT_TYPE); String bodyStr = JacksonUtils.toJson(body); if (MediaType.APPLICATION_FORM_URLENCODED.equals(contentType)) { Map<String, String> map = JacksonUtils.toObj(bodyStr, HashMap.class); bodyStr = HttpUtils.encodingParams(map, headers.getCharset()); } if (bodyStr != null) { conn.setDoOutput(true); byte[] b = bodyStr.getBytes(); conn.setRequestProperty("Content-Length", String.valueOf(b.length)); OutputStream outputStream = conn.getOutputStream(); outputStream.write(b, 0, b.length); outputStream.flush(); IoUtils.closeQuietly(outputStream); } } conn.connect(); return new JdkHttpClientResponse(conn); }

 

【5】beatReactor.addBeatInfo  心跳任務的流程【服務心跳

//BeatReactor類#構造方法
public BeatReactor(NamingProxy serverProxy, int threadCount) {
    this.serverProxy = serverProxy;
    //定義延遲的執行緒池
    this.executorService = new ScheduledThreadPoolExecutor(threadCount, new ThreadFactory() {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(true);
            thread.setName("com.alibaba.nacos.naming.beat.sender");
            return thread;
        }
    });
}

//添加任務方法
public void addBeatInfo(String serviceName, BeatInfo beatInfo) {
    NAMING_LOGGER.info("[BEAT] adding beat: {} to beat map.", beatInfo);
    String key = buildKey(serviceName, beatInfo.getIp(), beatInfo.getPort());
    BeatInfo existBeat = null;
    //fix #1733
    if ((existBeat = dom2Beat.remove(key)) != null) {
        existBeat.setStopped(true);
    }
    dom2Beat.put(key, beatInfo);
    //實際上就是往延遲的執行緒池添加任務
    executorService.schedule(new BeatTask(beatInfo), beatInfo.getPeriod(), TimeUnit.MILLISECONDS);
    MetricsMonitor.getDom2BeatSizeMonitor().set(dom2Beat.size());
}

//分析心跳任務類,主要都是run方法
//這種呼叫方式eureka中也是
class BeatTask implements Runnable {
    
    BeatInfo beatInfo;
    
    public BeatTask(BeatInfo beatInfo) {
        this.beatInfo = beatInfo;
    }
    
    @Override
    public void run() {
        if (beatInfo.isStopped()) {
            return;
        }
        long nextTime = beatInfo.getPeriod();
        try {
            //呼叫server代理實體發送心跳介面
            JsonNode result = serverProxy.sendBeat(beatInfo, BeatReactor.this.lightBeatEnabled);
            long interval = result.get("clientBeatInterval").asLong();
            boolean lightBeatEnabled = false;
            if (result.has(CommonParams.LIGHT_BEAT_ENABLED)) {
                lightBeatEnabled = result.get(CommonParams.LIGHT_BEAT_ENABLED).asBoolean();
            }
            BeatReactor.this.lightBeatEnabled = lightBeatEnabled;
            if (interval > 0) {
                nextTime = interval;
            }
            int code = NamingResponseCode.OK;
            if (result.has(CommonParams.CODE)) {
                code = result.get(CommonParams.CODE).asInt();
            }
            //服務回傳沒有,則再次注冊
            if (code == NamingResponseCode.RESOURCE_NOT_FOUND) {
                Instance instance = new Instance();
                instance.setPort(beatInfo.getPort());
                instance.setIp(beatInfo.getIp());
                instance.setWeight(beatInfo.getWeight());
                instance.setMetadata(beatInfo.getMetadata());
                instance.setClusterName(beatInfo.getCluster());
                instance.setServiceName(beatInfo.getServiceName());
                instance.setInstanceId(instance.getInstanceId());
                instance.setEphemeral(true);
                try {
                    //又是一個注冊方法的呼叫
                    serverProxy.registerService(beatInfo.getServiceName(),
                            NamingUtils.getGroupName(beatInfo.getServiceName()), instance);
                } catch (Exception ignore) {
                }
            }
        } catch (NacosException ex) {...}
        //方法內再次將任務塞入,形成回圈呼叫
        executorService.schedule(new BeatTask(beatInfo), nextTime, TimeUnit.MILLISECONDS);
    }
}

public JsonNode sendBeat(BeatInfo beatInfo, boolean lightBeatEnabled) throws NacosException {

    Map<String, String> params = new HashMap<String, String>(8);
    Map<String, String> bodyMap = new HashMap<String, String>(2);
    if (!lightBeatEnabled) {
        bodyMap.put("beat", JacksonUtils.toJson(beatInfo));
    }
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, beatInfo.getServiceName());
    params.put(CommonParams.CLUSTER_NAME, beatInfo.getCluster());
    params.put("ip", beatInfo.getIp());
    params.put("port", String.valueOf(beatInfo.getPort()));
    //地址為/nacos/v1/ns/instance/beat
    String result = reqApi(UtilAndComs.nacosUrlBase + "/instance/beat", params, bodyMap, HttpMethod.PUT);
    return JacksonUtils.toObj(result);
}

 

【6】分析如何引入服務的【服務發現

//NacosNamingService類#getAllInstances方法
@Override
public List<Instance> getAllInstances(String serviceName, String groupName, List<String> clusters, boolean subscribe) throws NacosException {
    
    ServiceInfo serviceInfo;
    // 是否是訂閱模式,默認是true
    if (subscribe) {
        // 先從客戶端快取獲取服務資訊
        serviceInfo = hostReactor.getServiceInfo(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    } else {
        // 如果本地快取不存在服務資訊,則進行訂閱
        serviceInfo = hostReactor.getServiceInfoDirectlyFromServer(NamingUtils.getGroupedName(serviceName, groupName), StringUtils.join(clusters, ","));
    }
    List<Instance> list;
    // 從服務資訊中獲取實體串列
    if (serviceInfo == null || CollectionUtils.isEmpty(list = serviceInfo.getHosts())) {
        return new ArrayList<Instance>();
    }
    return list;
}

 

【6.1】分析先從快取中拿的hostReactor.getServiceInfo方法

//獲取服務資訊
public ServiceInfo getServiceInfo(final String serviceName, final String clusters) {
    
    NAMING_LOGGER.debug("failover-mode: " + failoverReactor.isFailoverSwitch());
    String key = ServiceInfo.getKey(serviceName, clusters);
    if (failoverReactor.isFailoverSwitch()) {
        return failoverReactor.getService(key);
    }
    //獲取服務的資訊
    ServiceInfo serviceObj = getServiceInfo0(serviceName, clusters);
    //客戶端第一次獲取這個注冊表資訊為空
    if (null == serviceObj) {
        serviceObj = new ServiceInfo(serviceName, clusters);
        serviceInfoMap.put(serviceObj.getKey(), serviceObj);
        updatingMap.put(serviceName, new Object());

        //會去拉取這個注冊中心里面的注冊表資訊
        updateServiceNow(serviceName, clusters);
        updatingMap.remove(serviceName);
        
    } 
    //如果本地快取里面已有這個注冊表資訊
    else if (updatingMap.containsKey(serviceName)) {
        
        if (UPDATE_HOLD_INTERVAL > 0) {
            // hold a moment waiting for update finish
            synchronized (serviceObj) {
                try {
                    serviceObj.wait(UPDATE_HOLD_INTERVAL);
                } catch (InterruptedException e) {...}
            }
        }
    }
    //客戶端會開啟一個定時任務,每隔幾秒會去拉取注冊中心里面的全部實體的資訊
    scheduleUpdateIfAbsent(serviceName, clusters);
    
    return serviceInfoMap.get(serviceObj.getKey());
}

//HostReactor類# Map<String, ServiceInfo> serviceInfoMap屬性【這個便是客戶端保存實體資料的快取所在】
//實際上是先從serviceInfoMap屬性里面拿的
private ServiceInfo getServiceInfo0(String serviceName, String clusters) {
    String key = ServiceInfo.getKey(serviceName, clusters);
    return serviceInfoMap.get(key);
}

 

【6.1.1】分析遠程拉取流程updateServiceNow方法

private void updateServiceNow(String serviceName, String clusters) {
    try {
        updateService(serviceName, clusters);
    } catch (NacosException e) {...}
}

public void updateService(String serviceName, String clusters) throws NacosException {
    ServiceInfo oldService = getServiceInfo0(serviceName, clusters);
    try {
        //遠程呼叫
        String result = serverProxy.queryList(serviceName, clusters, pushReceiver.getUdpPort(), false);
        
        if (StringUtils.isNotEmpty(result)) {
            //處理并塞入serviceInfoMap,還會發送一個InstancesChangeEvent事件
            processServiceJson(result);
        }
    } finally {
        if (oldService != null) {
            synchronized (oldService) {
                oldService.notifyAll();
            }
        }
    }
}

public String queryList(String serviceName, String clusters, int udpPort, boolean healthyOnly) throws NacosException {
    
    final Map<String, String> params = new HashMap<String, String>(8);
    params.put(CommonParams.NAMESPACE_ID, namespaceId);
    params.put(CommonParams.SERVICE_NAME, serviceName);
    params.put("clusters", clusters);
    params.put("udpPort", String.valueOf(udpPort));
    params.put("clientIP", NetUtils.localIP());
    params.put("healthyOnly", String.valueOf(healthyOnly));
    //呼叫服務的API,獲取服務注冊中心里面的全部實體
    return reqApi(UtilAndComs.nacosUrlBase + "/instance/list", params, HttpMethod.GET);
}

 

【6.1.1.1】分析定時任務scheduleUpdateIfAbsent方法做了什么

public void scheduleUpdateIfAbsent(String serviceName, String clusters) {
    if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
        return;
    }
    
    synchronized (futureMap) {
        if (futureMap.get(ServiceInfo.getKey(serviceName, clusters)) != null) {
            return;
        }
        
        ScheduledFuture<?> future = addTask(new UpdateTask(serviceName, clusters));
        futureMap.put(ServiceInfo.getKey(serviceName, clusters), future);
    }
}

//DEFAULT_DELAY = 1000L,也就是說是1s
public synchronized ScheduledFuture<?> addTask(UpdateTask task) {
    return executor.schedule(task, DEFAULT_DELAY, TimeUnit.MILLISECONDS);
}

//分析UpdateTask類的run方法
@Override
public void run() {
    long delayTime = DEFAULT_DELAY;
    
    try {
        // 根據serviceName獲取到當前服務的資訊,包括服務器地址串列
        ServiceInfo serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        // 如果為空,則重新拉取最新的服務串列
        if (serviceObj == null) {
            updateService(serviceName, clusters);
            return;
        }
        // 如果時間戳<=上次更新的時間,則進行更新操作
        if (serviceObj.getLastRefTime() <= lastRefTime) {
            updateService(serviceName, clusters);
            serviceObj = serviceInfoMap.get(ServiceInfo.getKey(serviceName, clusters));
        } else {
            // 如果serviceObj的refTime更晚,
            // 則表示服務通過主動push機制已被更新,這時我們只進行重繪操作
            refreshOnly(serviceName, clusters);
        }
        // 重繪服務的更新時間
        lastRefTime = serviceObj.getLastRefTime();
        // 如果訂閱被取消,則停止更新任務
        if (!notifier.isSubscribed(serviceName, clusters) && !futureMap.containsKey(ServiceInfo.getKey(serviceName, clusters))) {
            return;
        }
        // 如果沒有可供呼叫的服務串列,則統計失敗次數+1
        if (CollectionUtils.isEmpty(serviceObj.getHosts())) {
            incFailCount();
            return;
        }
        // 設定延遲一段時間后進行查詢
        delayTime = serviceObj.getCacheMillis();
        // 將失敗查詢次數重置為0
        resetFailCount();
    } catch (Throwable e) {
        incFailCount();
    } finally {
        // 設定下一次查詢任務的觸發時間
        // 默認是1s,按照失敗次數翻倍,最大60s
        // 也就是【1,2,4,8,16,32,60】
        executor.schedule(this, Math.min(delayTime << failCount, DEFAULT_DELAY * 60), TimeUnit.MILLISECONDS);
    }
}

 

服務端分析

【1】分析nacos.naming.controllers包下的InstanceController

【1.1】分析注冊方法

//RESTful的介面規范
@CanDistro
@PostMapping
@Secured(parser = NamingResourceParser.class, action = ActionTypes.WRITE)
public String register(HttpServletRequest request) throws Exception {
    // 嘗試獲取namespaceId
    final String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    // 嘗試獲取serviceName,其格式為 group_name@@service_name
    final String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    // 決議出實體資訊,封裝為Instance物件
    final Instance instance = parseInstance(request);
    // 注冊實體
    serviceManager.registerInstance(namespaceId, serviceName, instance);
    return "ok";
}

//ServiceManager類#registerInstance方法
public void registerInstance(String namespaceId, String serviceName, Instance instance) throws NacosException {
    //判斷本地快取中是否存在該命名空間,如果不存在就創建,之后判斷該命名空間下是否
    //存在該服務,如果不存在就創建空的服務
    //注意這里并沒有更新服務的實體資訊
    createEmptyService(namespaceId, serviceName, instance.isEphemeral());
    //從本地快取中獲取服務資訊
    Service service = getService(namespaceId, serviceName);
    
    if (service == null) {
        throw new NacosException(...);
    }
    //服務注冊,這一步才會把服務的實體資訊和服務系結起來
    addInstance(namespaceId, serviceName, instance.isEphemeral(), instance);
}

【1.1.1】分析createEmptyService方法

public void createEmptyService(String namespaceId, String serviceName, boolean local) throws NacosException {
    createServiceIfAbsent(namespaceId, serviceName, local, null);
}

public void createServiceIfAbsent(String namespaceId, String serviceName, boolean local, Cluster cluster) throws NacosException {
    Service service = getService(namespaceId, serviceName);
    //沒有才會去創建
    if (service == null) {        
        Loggers.SRV_LOG.info("creating empty service {}:{}", namespaceId, serviceName);
        service = new Service();
        service.setName(serviceName);
        service.setNamespaceId(namespaceId);
        service.setGroupName(NamingUtils.getGroupName(serviceName));
        // now validate the service. if failed, exception will be thrown
        service.setLastModifiedMillis(System.currentTimeMillis());
        service.recalculateChecksum();
        if (cluster != null) {
            cluster.setService(service);
            service.getClusterMap().put(cluster.getName(), cluster);
        }
        service.validate();
        //將創建的空的服務插入快取,并初始化
        putServiceAndInit(service);
        if (!local) {
            addOrReplaceService(service);
        }
    }
}

//從Map中取出,這個Map的定義
//private final Map<String, Map<String, Service>> serviceMap = new ConcurrentHashMap<>();
//感覺是不是和eureka的雙重Map存盤很相似
public Service getService(String namespaceId, String serviceName) {
    if (serviceMap.get(namespaceId) == null) {
        return null;
    }
    return chooseServiceMap(namespaceId).get(serviceName);
}

private void putServiceAndInit(Service service) throws NacosException {
    //將服務插入快取
    putService(service);
    //對服務進行初始化
    service.init();
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), true), service);
    consistencyService.listen(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), false), service);
    Loggers.SRV_LOG.info("[NEW-SERVICE] {}", service.toJson());
}

//這里面采用的是雙重檢查+鎖(也就是DCL【Double Check Lock】)
public void putService(Service service) {
    if (!serviceMap.containsKey(service.getNamespaceId())) {
        synchronized (putServiceLock) {
            if (!serviceMap.containsKey(service.getNamespaceId())) {
                serviceMap.put(service.getNamespaceId(), new ConcurrentSkipListMap<>());
            }
        }
    }
    serviceMap.get(service.getNamespaceId()).put(service.getName(), service);
}

 

【1.1.1.1】分析服務初始化流程【這里面分為兩種,一種是持久實體,一種是臨時實體】【服務心跳

//初始化程序做了什么
public void init() {
    //健康檢查的執行緒添加一個心跳任務
    HealthCheckReactor.scheduleCheck(clientBeatCheckTask);
    for (Map.Entry<String, Cluster> entry : clusterMap.entrySet()) {
        entry.getValue().setService(this);
        entry.getValue().init();
    }
}

【1.1.1.1.1】持久實體的處理【對于單個cluster】

public void init() {
    if (inited) {  //這樣每個集群都只會開啟一次
        return;
    }
    // 創建健康檢測的任務
    checkTask = new HealthCheckTask(this);
    // 這里會開啟對 非臨時實體的 定時健康檢測
    HealthCheckReactor.scheduleCheck(checkTask);
    inited = true;
}

//checkRtNormalized = 2000 + RandomUtils.nextInt(0, RandomUtils.nextInt(0, switchDomain.getTcpHealthParams().getMax()));
public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) {
    task.setStartTime(System.currentTimeMillis());
    //也就是延遲2000 + 5000毫秒內的亂數
    return GlobalExecutor.scheduleNamingHealth(task, task.getCheckRtNormalized(), TimeUnit.MILLISECONDS);
}

//HealthCheckTask類#run方法
@Override
public void run() {
    
    try {if (distroMapper.responsible(cluster.getService().getName()) && switchDomain.isHealthCheckEnabled(cluster.getService().getName())) {
            healthCheckProcessor.process(this);
        }
    } catch (Throwable e) {...} finally {
        if (!cancelled) {
            // 結束后,再次進行任務調度,一定延遲后執行
            HealthCheckReactor.scheduleCheck(this);
            
            // worst == 0 means never checked
            if (this.getCheckRtWorst() > 0 && switchDomain.isHealthCheckEnabled(cluster.getService().getName())  && distroMapper.responsible(cluster.getService().getName())) {
                // TLog doesn't support float so we must convert it into long
                long diff = ((this.getCheckRtLast() - this.getCheckRtLastLast()) * 10000) / this.getCheckRtLastLast();
                this.setCheckRtLastLast(this.getCheckRtLast());
                Cluster cluster = this.getCluster();
            }
        }
    }
}

//TcpSuperSenseProcessor類#process方法
@Override
public void process(HealthCheckTask task) {
    //拿出集群的持久實體
    List<Instance> ips = task.getCluster().allIPs(false);
    
    if (CollectionUtils.isEmpty(ips)) {
        return;
    }
    
    for (Instance ip : ips) {       
        if (ip.isMarked()) {
            continue;
        }
        
        if (!ip.markChecking()) {
            healthCheckCommon.reEvaluateCheckRT(task.getCheckRtNormalized() * 2, task, switchDomain.getTcpHealthParams());
            continue;
        }
        // 封裝健康檢測資訊到 Beat
        Beat beat = new Beat(ip, task);
        // 放入一個阻塞佇列中
        taskQueue.add(beat);
        MetricsMonitor.getTcpHealthCheckMonitor().incrementAndGet();
    }
}

//又基于他自己本身的建構式
public TcpSuperSenseProcessor() {
    try {
        //開啟個執行緒池
        selector = Selector.open();
        GlobalExecutor.submitTcpCheck(this);
        
    } catch (Exception e) {
        throw new IllegalStateException(...);
    }
}
//TcpSuperSenseProcessor類#run方法
@Override
public void run() {
    while (true) {
        try {
            processTask();
            
            int readyCount = selector.selectNow();
            if (readyCount <= 0) {
                continue;
            }
            
            Iterator<SelectionKey> iter = selector.selectedKeys().iterator();
            while (iter.hasNext()) {
                SelectionKey key = iter.next();
                iter.remove();
                
                GlobalExecutor.executeTcpSuperSense(new PostProcessor(key));
            }
        } catch (Throwable e) {...}
    }
}

//TcpSuperSenseProcessor類#processTask方法
private void processTask() throws Exception {
    Collection<Callable<Void>> tasks = new LinkedList<>();
    do {
        Beat beat = taskQueue.poll(CONNECT_TIMEOUT_MS / 2, TimeUnit.MILLISECONDS);
        if (beat == null) {
            return;
        }
        
        tasks.add(new TaskProcessor(beat));
    } while (taskQueue.size() > 0 && tasks.size() < NIO_THREAD_COUNT * 64);
    // 批量處理集合中的任務
    for (Future<?> f : GlobalExecutor.invokeAllTcpSuperSenseTask(tasks)) {
        f.get();
    }
}

private class TaskProcessor implements Callable<Void> {
    
    private static final int MAX_WAIT_TIME_MILLISECONDS = 500;
    
    Beat beat;
    
    public TaskProcessor(Beat beat) {
        this.beat = beat;
    }
    
    @Override
    public Void call() {
        // 獲取檢測任務已經等待的時長
        long waited = System.currentTimeMillis() - beat.getStartTime();
        if (waited > MAX_WAIT_TIME_MILLISECONDS) {...}
        
        SocketChannel channel = null;
        try {
            // 獲取實體資訊
            Instance instance = beat.getIp();
            BeatKey beatKey = keyMap.get(beat.toString());
            if (beatKey != null && beatKey.key.isValid()) {
                if (System.currentTimeMillis() - beatKey.birthTime < TCP_KEEP_ALIVE_MILLIS) {
                    instance.setBeingChecked(false);
                    return null;
                }
                
                beatKey.key.cancel();
                beatKey.key.channel().close();
            }
            // 通過NIO建立TCP連接
            channel = SocketChannel.open();
            channel.configureBlocking(false);
            // only by setting this can we make the socket close event asynchronous
            channel.socket().setSoLinger(false, -1);
            channel.socket().setReuseAddress(true);
            channel.socket().setKeepAlive(true);
            channel.socket().setTcpNoDelay(true);
            
            Cluster cluster = beat.getTask().getCluster();
            int port = cluster.isUseIPPort4Check() ? instance.getPort() : cluster.getDefCkport();
            channel.connect(new InetSocketAddress(instance.getIp(), port));
            // 注冊連接、讀取事件
            SelectionKey key = channel.register(selector, SelectionKey.OP_CONNECT | SelectionKey.OP_READ);
            key.attach(beat);
            keyMap.put(beat.toString(), new BeatKey(key));
            
            beat.setStartTime(System.currentTimeMillis());
            //構建一個延遲500毫秒的延遲
            GlobalExecutor.scheduleTcpSuperSenseTask(new TimeOutTask(key), CONNECT_TIMEOUT_MS, TimeUnit.MILLISECONDS);
        } catch (Exception e) {
            beat.finishCheck(false, false, switchDomain.getTcpHealthParams().getMax(),
                    "tcp:error:" + e.getMessage());
            
            if (channel != null) {
                try {
                    channel.close();
                } catch (Exception ignore) {
                }
            }
        }
        
        return null;
    }
}

【1.1.1.1.2】臨時實體的處理【對于整個service】

//心跳延遲5s,下一次還是5s
public static void scheduleCheck(ClientBeatCheckTask task) {
    futureMap.putIfAbsent(task.taskKey(), GlobalExecutor.scheduleNamingHealth(task, 5000, 5000, TimeUnit.MILLISECONDS));
}

//分析clientBeatCheckTask類,既然是task必然是要研究一下run方法的
//對Service開啟心跳檢測【但是你會發現只是對臨時實體】
@Override
public void run() {
    try {
     //hash取模,也就是只允許它在一臺機器上進行檢查
if (!getDistroMapper().responsible(service.getName())) { return; } if (!getSwitchDomain().isHealthCheckEnabled()) { return; } //拿到該服務下面的所有IP【單指臨時實體】 List<Instance> instances = service.allIPs(true); // 當前時間距離上次心跳時間超過15s,則將實體健康改為false for (Instance instance : instances) { //如果沒有設定,默認值DEFAULT_HEART_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15); if (System.currentTimeMillis() - instance.getLastBeat() > instance.getInstanceHeartBeatTimeOut()) { if (!instance.isMarked()) { if (instance.isHealthy()) { instance.setHealthy(false); getPushService().serviceChanged(service); ApplicationUtils.publishEvent(new InstanceHeartbeatTimeoutEvent(this, instance)); } } } } if (!getGlobalConfig().isExpireInstance()) { return; } //當前時間距離上次心跳時間超過30s,則將實體從記憶體中洗掉 for (Instance instance : instances) { if (instance.isMarked()) { continue; } //如果沒有設定,默認值DEFAULT_IP_DELETE_TIMEOUT = TimeUnit.SECONDS.toMillis(30); if (System.currentTimeMillis() - instance.getLastBeat() > instance.getIpDeleteTimeout()) { // delete instance,自己向自己發起洗掉請求 deleteIp(instance); } } } catch (Exception e) {...} }

【1.1.1.1.3】匯總一波

//Nacos的健康檢測有兩種模式:

//臨時實體:

    采用客戶端心跳檢測模式,心跳周期5秒

    心跳間隔超過15秒則標記為不健康

    心跳間隔超過30秒則從服務串列洗掉

//永久實體:

    采用服務端主動健康檢測方式

    周期為2000 + 5000毫秒內的亂數

    檢測例外只會標記為不健康,不會洗掉

 

 

【1.1.2】分析addInstance注冊方法

public void addInstance(String namespaceId, String serviceName, boolean ephemeral, Instance... ips) throws NacosException {
    
    String key = KeyBuilder.buildInstanceListKey(namespaceId, serviceName, ephemeral);
    
    Service service = getService(namespaceId, serviceName);
    
    synchronized (service) {
        //將注冊表中已經存在的實體與當前注冊過來的實體給合并為list
        List<Instance> instanceList = addIpAddresses(service, ephemeral, ips);
        
        Instances instances = new Instances();
        instances.setInstanceList(instanceList);
        //將上面合并后的實體串列更新到注冊表中
        //實作類為DelegateConsistencyServiceImpl
        consistencyService.put(key, instances);
    }
}

//構建KEY,ephemeral默認是true,也就是一般認為都是臨時實體
public static String buildInstanceListKey(String namespaceId, String serviceName, boolean ephemeral) {
    return ephemeral ? buildEphemeralInstanceListKey(namespaceId, serviceName)
            : buildPersistentInstanceListKey(namespaceId, serviceName);
}
//臨時實體
private static String buildEphemeralInstanceListKey(String namespaceId, String serviceName) {
    //com.alibaba.nacos.naming.iplist.ephemeral.{namespaceId}##{serviceName}
    return INSTANCE_LIST_KEY_PREFIX + EPHEMERAL_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;
}
//持久實體
private static String buildPersistentInstanceListKey(String namespaceId, String serviceName) {
    //com.alibaba.nacos.naming.iplist.{namespaceId}##{serviceName}
    return INSTANCE_LIST_KEY_PREFIX + namespaceId + NAMESPACE_KEY_CONNECTOR + serviceName;
}

private List<Instance> addIpAddresses(Service service, boolean ephemeral, Instance... ips) throws NacosException {
    return updateIpAddresses(service, UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD, ephemeral, ips);
}

public List<Instance> updateIpAddresses(Service service, String action, boolean ephemeral, Instance... ips) throws NacosException {
    
    Datum datum = consistencyService.get(KeyBuilder.buildInstanceListKey(service.getNamespaceId(), service.getName(), ephemeral));
    
    List<Instance> currentIPs = service.allIPs(ephemeral);
    Map<String, Instance> currentInstances = new HashMap<>(currentIPs.size());
    Set<String> currentInstanceIds = Sets.newHashSet();
    
    for (Instance instance : currentIPs) {
        currentInstances.put(instance.toIpAddr(), instance);
        currentInstanceIds.add(instance.getInstanceId());
    }
    
    Map<String, Instance> instanceMap;
    if (datum != null && null != datum.value) {
        instanceMap = setValid(((Instances) datum.value).getInstanceList(), currentInstances);
    } else {
        instanceMap = new HashMap<>(ips.length);
    }
    
    for (Instance instance : ips) {
        if (!service.getClusterMap().containsKey(instance.getClusterName())) {
            Cluster cluster = new Cluster(instance.getClusterName(), service);
            cluster.init();
            service.getClusterMap().put(instance.getClusterName(), cluster);
        }
        
        //我們這次進來的action是add,并不是remove,所以不走這里
        if (UtilsAndCommons.UPDATE_INSTANCE_ACTION_REMOVE.equals(action)) {
            instanceMap.remove(instance.getDatumKey());
        } else {
            //最后就是實體放到map中,以ip+port等資訊為key,value為當前實體
            Instance oldInstance = instanceMap.get(instance.getDatumKey());
            if (oldInstance != null) {
                //覆寫
                instance.setInstanceId(oldInstance.getInstanceId());
            } else {
                //新增
                instance.setInstanceId(instance.generateInstanceId(currentInstanceIds));
            }
            instanceMap.put(instance.getDatumKey(), instance);
        }
        
    }
    
    if (instanceMap.size() <= 0 && UtilsAndCommons.UPDATE_INSTANCE_ACTION_ADD.equals(action)) {
        throw new IllegalArgumentException(...);
    }
    
    return new ArrayList<>(instanceMap.values());
}

//DelegateConsistencyServiceImpl類#put方法
@Override
public void put(String key, Record value) throws NacosException {
    mapConsistencyService(key).put(key, value);
}

//private final PersistentConsistencyServiceDelegateImpl persistentConsistencyService;
//private final EphemeralConsistencyService ephemeralConsistencyService;  //實際上的實作是DistroConsistencyServiceImpl
//根據key值決定臨時節點的方式【AP架構】還會持久節點的方式【AP架構】
private ConsistencyService mapConsistencyService(String key) {
    return KeyBuilder.matchEphemeralKey(key) ? ephemeralConsistencyService : persistentConsistencyService;
}

【1.1.2.1】分析臨時節點的方式【AP架構模式

//DistroConsistencyServiceImpl類#put方法
@Override
public void put(String key, Record value) throws NacosException {
    //添加任務以及添加到記憶體中
    onPut(key, value);
    //同步資料到所有其他節點
    distroProtocol.sync(new DistroKey(key, KeyBuilder.INSTANCE_LIST_KEY_PREFIX), DataOperation.CHANGE, globalConfig.getTaskDispatchPeriod() / 2);
}


public void onPut(String key, Record value) {
    
    if (KeyBuilder.matchEphemeralInstanceListKey(key)) {
        Datum<Instances> datum = new Datum<>();
        //當前服務的所有實體
        datum.value =https://www.cnblogs.com/chafry/p/ (Instances) value;
        datum.key = key;
        datum.timestamp.incrementAndGet();
        //添加到dataStore里面,實際上是放入里面的dataMap里面
        dataStore.put(key, datum);
    }
    
    if (!listeners.containsKey(key)) {
        return;
    }
    //添加異步任務
    notifier.addTask(key, DataOperation.CHANGE);
}

//分析notifier類,既然繼承了Runnable介面必然有run方法 
//GlobalExecutor.submitDistroNotifyTask(notifier); 在DistroConsistencyServiceImpl類中初始化,默認單執行緒
public class Notifier implements Runnable { private ConcurrentHashMap<String, String> services = new ConcurrentHashMap<>(10 * 1024); private BlockingQueue<Pair<String, DataOperation>> tasks = new ArrayBlockingQueue<>(1024 * 1024); public void addTask(String datumKey, DataOperation action) { if (services.containsKey(datumKey) && action == DataOperation.CHANGE) { return; } if (action == DataOperation.CHANGE) { services.put(datumKey, StringUtils.EMPTY); } //存入阻塞佇列里面 tasks.offer(Pair.with(datumKey, action)); } public int getTaskSize() { return tasks.size(); } @Override public void run() { //死回圈不斷處理佇列的資料 for (; ; ) { try { //從佇列里面拿出資料進行處理 Pair<String, DataOperation> pair = tasks.take(); handle(pair); } catch (Throwable e) {..} } } private void handle(Pair<String, DataOperation> pair) { try { //將資料還原 String datumKey = pair.getValue0(); DataOperation action = pair.getValue1(); services.remove(datumKey); int count = 0; if (!listeners.containsKey(datumKey)) { return; } for (RecordListener listener : listeners.get(datumKey)) { count++; try { if (action == DataOperation.CHANGE) { //會呼叫Service類#onChange方法 listener.onChange(datumKey, dataStore.get(datumKey).value); continue; } if (action == DataOperation.DELETE) { listener.onDelete(datumKey); continue; } } catch (Throwable e) {...} } } catch (Throwable e) {...} } }

【1.1.2.1.1】分析Service類#onChange方法

//Service類#onChange方法
@Override
public void onChange(String key, Instances value) throws Exception {
    //對權重的處理,不太重要
    for (Instance instance : value.getInstanceList()) {
        if (instance == null) {
            // Reject this abnormal instance list:
            throw new RuntimeException("got null instance " + key);
        }
        if (instance.getWeight() > 10000.0D) {
            instance.setWeight(10000.0D);
        }
        if (instance.getWeight() < 0.01D && instance.getWeight() > 0.0D) {
            instance.setWeight(0.01D);
        }
    }
    //注冊邏輯在這里
    updateIPs(value.getInstanceList(), KeyBuilder.matchEphemeralInstanceListKey(key));
    //生成校驗比對的哈希值碼
    recalculateChecksum();
}

 

【1.1.2.1.1.1】分析注冊邏輯

//更新注冊表,這里采用了寫時復制思想:即將注冊表拷貝一個副本出來,更新這個副本,但是服務發現的時候還是從注冊表里獲取,待全部更新完畢再將副本替換回注冊表中,這樣就避免了注冊表的讀寫并發問題,這種方式不用加鎖,從而大大提升了性能
//真正的注冊邏輯
public void updateIPs(Collection<Instance> instances, boolean ephemeral) {
    Map<String, List<Instance>> ipMap = new HashMap<>(clusterMap.size());
    for (String clusterName : clusterMap.keySet()) {
        ipMap.put(clusterName, new ArrayList<>());
    }
    
    for (Instance instance : instances) {
        try {
            if (instance == null) {  continue;  }
            
            if (StringUtils.isEmpty(instance.getClusterName())) {
                instance.setClusterName(UtilsAndCommons.DEFAULT_CLUSTER_NAME);
            }
            
            if (!clusterMap.containsKey(instance.getClusterName())) {
                Cluster cluster = new Cluster(instance.getClusterName(), this);
                cluster.init();
                getClusterMap().put(instance.getClusterName(), cluster);
            }
            
            List<Instance> clusterIPs = ipMap.get(instance.getClusterName());
            if (clusterIPs == null) {
                clusterIPs = new LinkedList<>();
                ipMap.put(instance.getClusterName(), clusterIPs);
            }
            
            clusterIPs.add(instance);
        } catch (Exception e) {...}
    }
    
    for (Map.Entry<String, List<Instance>> entry : ipMap.entrySet()) {
        //make every ip mine
        List<Instance> entryIPs = entry.getValue();
        //在這里進行注冊
        clusterMap.get(entry.getKey()).updateIps(entryIPs, ephemeral);
    }
    
    setLastModifiedMillis(System.currentTimeMillis());
  //發送監聽事件 getPushService().serviceChanged(
this); StringBuilder stringBuilder = new StringBuilder(); for (Instance instance : allIPs()) { stringBuilder.append(instance.toIpAddr()).append("_").append(instance.isHealthy()).append(","); } } public void updateIps(List<Instance> ips, boolean ephemeral) { Set<Instance> toUpdateInstances = ephemeral ? ephemeralInstances : persistentInstances; HashMap<String, Instance> oldIpMap = new HashMap<>(toUpdateInstances.size()); for (Instance ip : toUpdateInstances) { oldIpMap.put(ip.getDatumKey(), ip); } //進行新舊合并 List<Instance> updatedIPs = updatedIps(ips, oldIpMap.values()); if (updatedIPs.size() > 0) { for (Instance ip : updatedIPs) { Instance oldIP = oldIpMap.get(ip.getDatumKey()); // do not update the ip validation status of updated ips // because the checker has the most precise result // Only when ip is not marked, don't we update the health status of IP: if (!ip.isMarked()) { ip.setHealthy(oldIP.isHealthy()); } if (ip.isHealthy() != oldIP.isHealthy()) {...} if (ip.getWeight() != oldIP.getWeight()) {...} } } List<Instance> newIPs = subtract(ips, oldIpMap.values()); if (newIPs.size() > 0) { for (Instance ip : newIPs) { HealthCheckStatus.reset(ip); } } List<Instance> deadIPs = subtract(oldIpMap.values(), ips); if (deadIPs.size() > 0) { for (Instance ip : deadIPs) { HealthCheckStatus.remv(ip); } } toUpdateInstances = new HashSet<>(ips); //寫入注冊表的cluster里面的串列,采用參考替換 if (ephemeral) { ephemeralInstances = toUpdateInstances; } else { persistentInstances = toUpdateInstances; } } //新舊合并形成新的串列 private List<Instance> updatedIps(Collection<Instance> newInstance, Collection<Instance> oldInstance) { List<Instance> intersects = (List<Instance>) CollectionUtils.intersection(newInstance, oldInstance); Map<String, Instance> stringIpAddressMap = new ConcurrentHashMap<>(intersects.size()); for (Instance instance : intersects) { stringIpAddressMap.put(instance.getIp() + ":" + instance.getPort(), instance); } Map<String, Integer> intersectMap = new ConcurrentHashMap<>(newInstance.size() + oldInstance.size()); Map<String, Instance> updatedInstancesMap = new ConcurrentHashMap<>(newInstance.size()); Map<String, Instance> newInstancesMap = new ConcurrentHashMap<>(newInstance.size()); for (Instance instance : oldInstance) { if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) { intersectMap.put(instance.toString(), 1); } } for (Instance instance : newInstance) { if (stringIpAddressMap.containsKey(instance.getIp() + ":" + instance.getPort())) { if (intersectMap.containsKey(instance.toString())) { intersectMap.put(instance.toString(), 2); } else { intersectMap.put(instance.toString(), 1); } } newInstancesMap.put(instance.toString(), instance); } for (Map.Entry<String, Integer> entry : intersectMap.entrySet()) { String key = entry.getKey(); Integer value = entry.getValue(); if (value =https://www.cnblogs.com/chafry/p/= 1) { if (newInstancesMap.containsKey(key)) { updatedInstancesMap.put(key, newInstancesMap.get(key)); } } } return new ArrayList<>(updatedInstancesMap.values()); }

 【1.1.2.1.1.1.1】分析getPushService().serviceChanged(this);發送的事件是如何推送給客戶端的

//發送的事件
public void serviceChanged(Service service) {
    // merge some change events to reduce the push frequency:
    if (futureMap.containsKey(UtilsAndCommons.assembleFullServiceName(service.getNamespaceId(), service.getName()))) {
        return;
    }
    
    this.applicationContext.publishEvent(new ServiceChangeEvent(this, service));
}

//使用IDEA的全文搜索,ctrl+shift+F,找到對應onApplicationEvent(ServiceChangeEvent event)
//PushService類#onApplicationEvent方法
@Override
public void onApplicationEvent(ServiceChangeEvent event) {
    Service service = event.getService();
    String serviceName = service.getName();
    String namespaceId = service.getNamespaceId();
    
    Future future = GlobalExecutor.scheduleUdpSender(() -> {
        try {
            Loggers.PUSH.info(serviceName + " is changed, add it to push queue.");
            ConcurrentMap<String, PushClient> clients = clientMap.get(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
            if (MapUtils.isEmpty(clients)) {
                return;
            }
            
            Map<String, Object> cache = new HashMap<>(16);
            long lastRefTime = System.nanoTime();
            for (PushClient client : clients.values()) {
                if (client.zombie()) {
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    clients.remove(client.toString());
                    Loggers.PUSH.debug("client is zombie: " + client.toString());
                    continue;
                }
                
                Receiver.AckEntry ackEntry;
                Loggers.PUSH.debug("push serviceName: {} to client: {}", serviceName, client.toString());
                String key = getPushCacheKey(serviceName, client.getIp(), client.getAgent());
                byte[] compressData = https://www.cnblogs.com/chafry/p/null;
                Map<String, Object> data = https://www.cnblogs.com/chafry/p/null;
                if (switchDomain.getDefaultPushCacheMillis() >= 20000 && cache.containsKey(key)) {
                    org.javatuples.Pair pair = (org.javatuples.Pair) cache.get(key);
                    compressData = (byte[]) (pair.getValue0());
                    data = (Map<String, Object>) pair.getValue1();
                    
                    Loggers.PUSH.debug("[PUSH-CACHE] cache hit: {}:{}", serviceName, client.getAddrStr());
                }
                
                if (compressData != null) {
                    ackEntry = prepareAckEntry(client, compressData, data, lastRefTime);
                } else {
                    ackEntry = prepareAckEntry(client, prepareHostsData(client), lastRefTime);
                    if (ackEntry != null) {
                        cache.put(key, new org.javatuples.Pair<>(ackEntry.origin.getData(), ackEntry.data));
                    }
                }
                //發起請求
                udpPush(ackEntry);
            }
        } catch (Exception e) {...} finally {
            futureMap.remove(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName));
        }
        
    }, 1000, TimeUnit.MILLISECONDS);
    
    futureMap.put(UtilsAndCommons.assembleFullServiceName(namespaceId, serviceName), future);
    
}

private static Receiver.AckEntry udpPush(Receiver.AckEntry ackEntry) {
    if (ackEntry == null) {
        Loggers.PUSH.error("[NACOS-PUSH] ackEntry is null.");
        return null;
    }
    
    if (ackEntry.getRetryTimes() > MAX_RETRY_TIMES) {
        Loggers.PUSH.warn("max re-push times reached, retry times {}, key: {}", ackEntry.retryTimes, ackEntry.key);
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        return ackEntry;
    }
    
    try {
        if (!ackMap.containsKey(ackEntry.key)) {
            totalPush++;
        }
        ackMap.put(ackEntry.key, ackEntry);
        udpSendTimeMap.put(ackEntry.key, System.currentTimeMillis());
        
        Loggers.PUSH.info("send udp packet: " + ackEntry.key);
        //UDP發送
        udpSocket.send(ackEntry.origin);
        
        ackEntry.increaseRetryTime();
        
        GlobalExecutor.scheduleRetransmitter(new Retransmitter(ackEntry),TimeUnit.NANOSECONDS.toMillis(ACK_TIMEOUT_NANOS), TimeUnit.MILLISECONDS);
        
        return ackEntry;
    } catch (Exception e) {
        ackMap.remove(ackEntry.key);
        udpSendTimeMap.remove(ackEntry.key);
        failedPush += 1;
        
        return null;
    }
}

 

【1.1.2.1.1.2】分析recalculateChecksum方法如何生成比對的哈希值碼

//有了解過eureka的增量更新便應該知道,如何知道你自己拉取的資料全不全,就是靠比對這個哈希值碼
//如果服務端上的是【ABCDEF】,而你本地的是【ABCDF】,那么哈希值碼不一樣說明資料缺失了,
public synchronized void recalculateChecksum() {
    List<Instance> ips = allIPs();
    
    StringBuilder ipsString = new StringBuilder();
    ipsString.append(getServiceString());
    
    if (Loggers.SRV_LOG.isDebugEnabled()) {
        Loggers.SRV_LOG.debug("service to json: " + getServiceString());
    }
    
    if (CollectionUtils.isNotEmpty(ips)) {
        Collections.sort(ips);
    }
    
    for (Instance ip : ips) {
        String string = ip.getIp() + ":" + ip.getPort() + "_" + ip.getWeight() + "_" + ip.isHealthy() + "_" + ip.getClusterName();
        ipsString.append(string);
        ipsString.append(",");
    }
    
    checksum = MD5Utils.md5Hex(ipsString.toString(), Constants.ENCODE);
}

 

【1.1.2.1.2】分析集群服務新增資料同步的方法 distroProtocol.sync

//DistroProtocol類#sync方法
public void sync(DistroKey distroKey, DataOperation action, long delay) {
    for (Member each : memberManager.allMembersWithoutSelf()) {
        DistroKey distroKeyWithTarget = new DistroKey(distroKey.getResourceKey(), distroKey.getResourceType(), each.getAddress());
        DistroDelayTask distroDelayTask = new DistroDelayTask(distroKeyWithTarget, action, delay);
        //添加任務,采用異步的方式,會有重試功能
        distroTaskEngineHolder.getDelayTaskExecuteEngine().addTask(distroKeyWithTarget, distroDelayTask);
    }
}

 

【1.1.2.2】分析持久節點的方式(這塊還有部分沒搞懂,后面補上)

 【1.2】分析服務發現的呼叫

@GetMapping("/list")
@Secured(parser = NamingResourceParser.class, action = ActionTypes.READ)
public ObjectNode list(HttpServletRequest request) throws Exception {
    
    String namespaceId = WebUtils.optional(request, CommonParams.NAMESPACE_ID, Constants.DEFAULT_NAMESPACE_ID);
    String serviceName = WebUtils.required(request, CommonParams.SERVICE_NAME);
    NamingUtils.checkServiceNameFormat(serviceName);
    
    String agent = WebUtils.getUserAgent(request);
    String clusters = WebUtils.optional(request, "clusters", StringUtils.EMPTY);
    String clientIP = WebUtils.optional(request, "clientIP", StringUtils.EMPTY);
    int udpPort = Integer.parseInt(WebUtils.optional(request, "udpPort", "0"));
    String env = WebUtils.optional(request, "env", StringUtils.EMPTY);
    boolean isCheck = Boolean.parseBoolean(WebUtils.optional(request, "isCheck", "false"));
    String app = WebUtils.optional(request, "app", StringUtils.EMPTY);
    String tenant = WebUtils.optional(request, "tid", StringUtils.EMPTY);
    boolean healthyOnly = Boolean.parseBoolean(WebUtils.optional(request, "healthyOnly", "false"));
    
    //上面進行引數校驗,這里開始
    return doSrvIpxt(namespaceId, serviceName, agent, clusters, clientIP, udpPort, env, isCheck, app, tenant,healthyOnly);
}

//查看是如何獲取服務進行回傳的
public ObjectNode doSrvIpxt(String namespaceId, String serviceName, String agent, String clusters, String clientIP, int udpPort, String env, boolean isCheck, String app, String tid, boolean healthyOnly) throws Exception {
    
    ClientInfo clientInfo = new ClientInfo(agent);
    ObjectNode result = JacksonUtils.createEmptyJsonNode();
    Service service = serviceManager.getService(namespaceId, serviceName);
    long cacheMillis = switchDomain.getDefaultCacheMillis();
    
    // now try to enable the push
    try {
        //判斷是否支持UDP方式推送,不重要
        if (udpPort > 0 && pushService.canEnablePush(agent)) {
            
            pushService.addClient(namespaceId, serviceName, clusters, agent, new InetSocketAddress(clientIP, udpPort), pushDataSource, tid, app);
            cacheMillis = switchDomain.getPushCacheMillis(serviceName);
        }
    } catch (Exception e) {
        cacheMillis = switchDomain.getDefaultCacheMillis();
    }
    
    if (service == null) {
        result.put("name", serviceName);
        result.put("clusters", clusters);
        result.put("cacheMillis", cacheMillis);
        result.replace("hosts", JacksonUtils.createEmptyArrayNode());
        return result;
    }
    
    checkIfDisabled(service);
    
    List<Instance> srvedIPs;
    //主要是在這里獲取
    srvedIPs = service.srvIPs(Arrays.asList(StringUtils.split(clusters, ",")));
    
    //然后下面主要就是塞資料然后回傳
    // filter ips using selector:
    if (service.getSelector() != null && StringUtils.isNotBlank(clientIP)) {
        srvedIPs = service.getSelector().select(clientIP, srvedIPs);
    }
    
    if (CollectionUtils.isEmpty(srvedIPs)) {
        if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
            result.put("dom", serviceName);
        } else {
            result.put("dom", NamingUtils.getServiceName(serviceName));
        }
        
        result.put("name", serviceName);
        result.put("cacheMillis", cacheMillis);
        result.put("lastRefTime", System.currentTimeMillis());
        result.put("checksum", service.getChecksum());
        result.put("useSpecifiedURL", false);
        result.put("clusters", clusters);
        result.put("env", env);
        result.set("hosts", JacksonUtils.createEmptyArrayNode());
        result.set("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
        return result;
    }
    
    Map<Boolean, List<Instance>> ipMap = new HashMap<>(2);
    ipMap.put(Boolean.TRUE, new ArrayList<>());
    ipMap.put(Boolean.FALSE, new ArrayList<>());
    
    for (Instance ip : srvedIPs) {
        ipMap.get(ip.isHealthy()).add(ip);
    }
    
    if (isCheck) {
        result.put("reachProtectThreshold", false);
    }
    
    double threshold = service.getProtectThreshold();
    
    if ((float) ipMap.get(Boolean.TRUE).size() / srvedIPs.size() <= threshold) {
        
        if (isCheck) {
            result.put("reachProtectThreshold", true);
        }
        
        ipMap.get(Boolean.TRUE).addAll(ipMap.get(Boolean.FALSE));
        ipMap.get(Boolean.FALSE).clear();
    }
    
    if (isCheck) {
        result.put("protectThreshold", service.getProtectThreshold());
        result.put("reachLocalSiteCallThreshold", false);
        
        return JacksonUtils.createEmptyJsonNode();
    }
    
    ArrayNode hosts = JacksonUtils.createEmptyArrayNode();
    
    for (Map.Entry<Boolean, List<Instance>> entry : ipMap.entrySet()) {
        List<Instance> ips = entry.getValue();
        
        if (healthyOnly && !entry.getKey()) {
            continue;
        }
        
        for (Instance instance : ips) {
            
            // remove disabled instance:
            if (!instance.isEnabled()) {
                continue;
            }
            
            ObjectNode ipObj = JacksonUtils.createEmptyJsonNode();
            
            ipObj.put("ip", instance.getIp());
            ipObj.put("port", instance.getPort());
            // deprecated since nacos 1.0.0:
            ipObj.put("valid", entry.getKey());
            ipObj.put("healthy", entry.getKey());
            ipObj.put("marked", instance.isMarked());
            ipObj.put("instanceId", instance.getInstanceId());
            ipObj.set("metadata", JacksonUtils.transferToJsonNode(instance.getMetadata()));
            ipObj.put("enabled", instance.isEnabled());
            ipObj.put("weight", instance.getWeight());
            ipObj.put("clusterName", instance.getClusterName());
            if (clientInfo.type == ClientInfo.ClientType.JAVA
                    && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
                ipObj.put("serviceName", instance.getServiceName());
            } else {
                ipObj.put("serviceName", NamingUtils.getServiceName(instance.getServiceName()));
            }
            
            ipObj.put("ephemeral", instance.isEphemeral());
            hosts.add(ipObj);
            
        }
    }
    
    result.replace("hosts", hosts);
    if (clientInfo.type == ClientInfo.ClientType.JAVA && clientInfo.version.compareTo(VersionUtil.parseVersion("1.0.0")) >= 0) {
        result.put("dom", serviceName);
    } else {
        result.put("dom", NamingUtils.getServiceName(serviceName));
    }
    result.put("name", serviceName);
    result.put("cacheMillis", cacheMillis);
    result.put("lastRefTime", System.currentTimeMillis());
    result.put("checksum", service.getChecksum());
    result.put("useSpecifiedURL", false);
    result.put("clusters", clusters);
    result.put("env", env);
    result.replace("metadata", JacksonUtils.transferToJsonNode(service.getMetadata()));
    return result;
}

 【1.2.1】分析service.srvIPs方法在服務發現的如何獲取所有實體

public List<Instance> srvIPs(List<String> clusters) {
    if (CollectionUtils.isEmpty(clusters)) {
        clusters = new ArrayList<>();
        clusters.addAll(clusterMap.keySet());
    }
    return allIPs(clusters);
}

public List<Instance> allIPs(List<String> clusters) {
    List<Instance> result = new ArrayList<>();
    for (String cluster : clusters) {
        Cluster clusterObj = clusterMap.get(cluster);
        if (clusterObj == null) {
            continue;
        }
        //將臨時實體和持久實體一起回傳
        result.addAll(clusterObj.allIPs());
    }
    return result;
}

public List<Instance> allIPs() {
    List<Instance> allInstances = new ArrayList<>();
    allInstances.addAll(persistentInstances);
    allInstances.addAll(ephemeralInstances);
    return allInstances;
}

 

【2】集群情況下

【2.1】集群節點狀態同步任務

@Component("serverListManager") //自動被掃描成為Bean
public class ServerListManager extends MemberChangeListener {
     public ServerListManager(final SwitchDomain switchDomain, final ServerMemberManager memberManager) {
        this.switchDomain = switchDomain;
        this.memberManager = memberManager;
        NotifyCenter.registerSubscriber(this);
        this.servers = new ArrayList<>(memberManager.allMembers());
    }
    //初始化時候自動呼叫
    @PostConstruct
    public void init() {
        //注冊兩個任務的方法
        //集群節點狀態同步任務
        GlobalExecutor.registerServerStatusReporter(new ServerStatusReporter(), 2000);
        //本地服務更新任務
        GlobalExecutor.registerServerInfoUpdater(new ServerInfoUpdater());
    }
}

private class ServerStatusReporter implements Runnable {
    @Override
    public void run() {
        try {
            
            if (EnvUtil.getPort() <= 0) {
                return;
            }
            
            int weight = Runtime.getRuntime().availableProcessors() / 2;
            if (weight <= 0) {
                weight = 1;
            }
            
            long curTime = System.currentTimeMillis();
            String status = LOCALHOST_SITE + "#" + EnvUtil.getLocalAddress() + "#" + curTime + "#" + weight
                    + "\r\n";
            
            //獲取所有節點
            List<Member> allServers = getServers();
            
            if (!contains(EnvUtil.getLocalAddress())) {
                Loggers.SRV_LOG.error("local ip is not in serverlist, ip: {}, serverlist: {}",
                        EnvUtil.getLocalAddress(), allServers);
                return;
            }
            //遍歷
            if (allServers.size() > 0 && !EnvUtil.getLocalAddress().contains(IPUtil.localHostIP())) {
                for (Member server : allServers) {
                    //排除自己
                    if (Objects.equals(server.getAddress(), EnvUtil.getLocalAddress())) {
                        continue;
                    }
                    
                    // This metadata information exists from 1.3.0 onwards "version"
                    if (server.getExtendVal(MemberMetaDataConstants.VERSION) != null) {
                        continue;
                    }
                    
                    Message msg = new Message();
                    msg.setData(status);
                    //向介面/nacos/v1/ns/operator/server/status發送資料
                    synchronizer.send(server.getAddress(), msg);
                }
            }
        } catch (Exception e) {...} finally {
            //TimeUnit.SECONDS.toMillis(2),也就是每2s一次
            GlobalExecutor.registerServerStatusReporter(this, switchDomain.getServerStatusSynchronizationPeriodMillis());
        }
        
    }
}

 

 

 

【2.2】注冊服務實體資訊在集群節點間同步任務

//ServiceManager類#初始化方法
@PostConstruct
public void init() {
    //每分鐘同步一次
    GlobalExecutor.scheduleServiceReporter(new ServiceReporter(), 60000, TimeUnit.MILLISECONDS);
    
    GlobalExecutor.submitServiceUpdateManager(new UpdatedServiceProcessor());
    
    if (emptyServiceAutoClean) {
        // delay 60s, period 20s;        
        // This task is not recommended to be performed frequently in order to avoid
        // the possibility that the service cache information may just be deleted
        // and then created due to the heartbeat mechanism
        
        GlobalExecutor.scheduleServiceAutoClean(new EmptyServiceAutoClean(), cleanEmptyServiceDelay, cleanEmptyServicePeriod);
    }
    
    try {
        consistencyService.listen(KeyBuilder.SERVICE_META_KEY_PREFIX, this);
    } catch (NacosException e) {...}
}

//同步服務的健康狀態
private class ServiceReporter implements Runnable {
    
    @Override
    public void run() {
        try {
            
            Map<String, Set<String>> allServiceNames = getAllServiceNames();
            
            if (allServiceNames.size() <= 0) {
                //ignore
                return;
            }
            
            for (String namespaceId : allServiceNames.keySet()) {
                
                ServiceChecksum checksum = new ServiceChecksum(namespaceId);
                
                for (String serviceName : allServiceNames.get(namespaceId)) {
                    if (!distroMapper.responsible(serviceName)) {
                        continue;
                    }
                    
                    Service service = getService(namespaceId, serviceName);
                    
                    if (service == null || service.isEmpty()) {
                        continue;
                    }
                    
                    service.recalculateChecksum();
                    
                    checksum.addItem(serviceName, service.getChecksum());
                }
                
                Message msg = new Message();
                
                msg.setData(JacksonUtils.toJson(checksum));
                
                Collection<Member> sameSiteServers = memberManager.allMembers();
                
                if (sameSiteServers == null || sameSiteServers.size() <= 0) {
                    return;
                }
                
                for (Member server : sameSiteServers) {
                    if (server.getAddress().equals(NetUtils.localServer())) {
                        continue;
                    }
                    synchronizer.send(server.getAddress(), msg);
                }
            }
        } catch (Exception e) {...} finally {
            GlobalExecutor.scheduleServiceReporter(this, switchDomain.getServiceStatusSynchronizationPeriodMillis(), TimeUnit.MILLISECONDS);
        }
    }
}

 

Nacos服務注冊表結構:Map<namespace, map<group::servicename,="" service="">>

  結構展示

  

  示例展示

  

 

 

 

Nacos核心功能原始碼架構

 

ConcurrentSkipListMap
法 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/520580.html

標籤:Java

上一篇:SpringCloud(二) - Eureka注冊中心,feign遠程呼叫,hystrix降級和熔斷

下一篇:Java實作郵件發送

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more