主頁 > 後端開發 > 微服務組件--注冊中心Spring Cloud Eureka分析

微服務組件--注冊中心Spring Cloud Eureka分析

2022-10-23 07:11:52 後端開發

Eureka核心功能點

【1】服務注冊(register):Eureka Client會通過發送REST請求的方式向Eureka Server注冊自己的服務,提供自身的元資料,比如ip地址、埠、運行狀況指標的url、主頁地址等資訊,Eureka Server接收到注冊請求后,就會把這些元資料資訊存盤在一個雙層的Map中,

【2】服務續約(renew):在服務注冊后,Eureka Client會維護一個心跳來持續通知Eureka Server,說明服務一直處于可用狀態,防止被剔除,Eureka Client在默認的情況下會每隔30秒(eureka.instance.leaseRenewalIntervalInSeconds)發送一次心跳來進行服務續約,

【3】服務同步(replicate):Eureka Server之間會互相進行注冊,構建Eureka Server集群,不同Eureka Server之間會進行服務同步,用來保證服務資訊的一致性,

【4】獲取服務(get registry):服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,獲取上面注冊的服務清單,并且快取在Eureka Client本地,默認快取30秒(eureka.client.registryFetchIntervalSeconds),同時,為了性能考慮,EurekaServer也會維護一份只讀的服務清單快取,該快取每隔30秒更新一次,

【5】服務呼叫:服務消費者在獲取到服務清單后,就可以根據清單中的服務串列資訊,查找到其他服務的地址,從而進行遠程呼叫,Eureka有Region和Zone的概念,一個Region可以包含多個Zone,在進行服務呼叫時,優先訪問處于同一個Zone中的服務提供者,

【6】服務下線(cancel):當Eureka Client需要關倍訓重啟時,就不希望在這個時間段內再有請求進來,所以,就需要提前先發送REST請求給Eureka Server,告訴Eureka Server自己要下線了,Eureka Server在收到請求后,就會把該服務狀態置為下線(DOWN),并把該下線事件傳播出去,

【7】服務剔除(evict):有時候,服務實體可能會因為網路故障等原因導致不能提供服務,而此時該實體也沒有發送請求給Eureka Server來進行服務下線,所以,還需要有服務剔除的機制,Eureka Server在啟動的時候會創建一個定時任務,每隔一段時間(默認60秒),從當前服務清單中把超時沒有續約(默認90秒,eureka.instance.leaseExpirationDurationInSeconds)的服務剔除,180s被剔除

【8】自我保護:既然Eureka Server會定時剔除超時沒有續約的服務,那就有可能出現一種場景,網路一段時間內發生了例外,所有的服務都沒能夠進行續約,Eureka Server就把所有的服務都剔除了,這樣顯然不太合理,所以,就有了自我保護機制,當短時間內,統計續約失敗的比例,如果達到一定閾值,則會觸發自我保護的機制,在該機制下,Eureka Server不會剔除任何的微服務,等到正常后,再退出自我保護機制,自我保護開關(eureka.server.enable-self-preservation: false)

 

常見的問題

  【1】當eureka服務實體有注冊或下線或有實體發生故障,記憶體注冊表雖然會及時更新資料,但是客戶端不一定能及時感知到,可能會過30秒才能感知到,因為客戶端拉取注冊表實體這里面有一個多級快取機制,【實作的是最終一致性

  【2】還有服務剔除的不是默認90秒沒心跳的實體,剔除的是180秒沒心跳的實體(eureka的bug導致,注解有說明是因為加了兩次過期時間,但是很小的BUG所有不修復了【在Lease結構里說明】)

  【3】分析eureka服務下線的情況

    1)圖示

      

 

    2)說明

1.客戶端每個30s會發送心跳到服務端
2.ReadOnlyCacheMap和ReadWriteCacheMap每30s同步一次
3.客戶端每隔30s同步一次ReadOnlyCacheMap
4.ribbon快取每隔30s同步一次【有負載均衡的情況】
所以正常下線需要120s
而非正常下線,外加上服務剔除的180s+60s的定時任務,也就是360s【6min】

如果出現時間太長容易出現問題
1.修改 ribbon 同步快取的時間為 3 秒:ribbon.ServerListRefreshInterval = 3000
2.修改客戶端同步快取時間為 3 秒 :eureka.client.registry-fetch-interval-seconds = 3
3.心跳間隔時間修改為 3 秒:eureka.instance.lease-renewal-interval-in-seconds = 3
4.超時剔除的時間改為 9 秒:eureka.instance.lease-expiration-duration-in-seconds = 9
5.清理執行緒定時時間改為 5 秒執行一次:eureka.server.eviction-interval-timer-in-ms = 5000
6.同步到只讀快取的時間修改為 3 秒一次:eureka.server.response-cache-update-interval-ms = 3000
只讀快取其實是可以關閉的,通過修改引數eureka.server.use-read-only-response-cache = false可以做到
正常下線就是 3+3+3+3=12 秒,非正常下線再加 18+5 秒為 35 秒,
因為本質上服務剔除的是超時過期的,而lease可知過期時間實際上是兩倍,也就是18s,考慮極端情況,18s剛好卡在定時任務的最后一刻,則是直接加上5s,
此外,這里的極端情況,也就是從某一次心跳之后開始不正常的,

 

 

原始碼精髓總結

【1】服務端多級快取設計思想

  1)在拉取注冊表的時候:

    (1)首先從ReadOnlyCacheMap里查快取的注冊表,

    (2)若沒有,就找ReadWriteCacheMap里快取的注冊表,

    (3)如果還沒有,就從記憶體中獲取實際的注冊表資料,

  2)在注冊表發生變更的時候:

    (1)會在記憶體中更新變更的注冊表資料,同時過期掉ReadWriteCacheMap,

    (2)此過程不會影響ReadOnlyCacheMap提供人家查詢注冊表,

    (3)默認每30秒Eureka Server會將ReadWriteCacheMap更新到ReadOnlyCacheMap里

    (4)默認每180秒Eureka Server會將ReadWriteCacheMap里是資料失效

    (5)下次有服務拉取注冊表,又會從記憶體中獲取最新的資料了,同時填充 各級快取

  3)多級快取機制的優點:

    (1)盡可能保證了記憶體注冊表資料不會出現頻繁的讀寫沖突問題

    (2)并且進一步保證對Eureka Server的大量請求,都是快速從純記憶體走,性能極高(可以稍微估計下對于一線互聯網公司,內部上千個eureka client實體,每分鐘對eureka大幾千次的訪問,一天就是上千萬次的訪問)

 

【2】TimedSupervisorTask定時任務的設計

  1)從整體上看,TimedSupervisorTask是固定間隔的周期性任務,一旦遇到超時就會將下一個周期的間隔時間調大,如果連續超時,那么每次間隔時間都會增大一倍,一直到達外部引數設定的上限為止,一旦新任務不再超時,間隔時間又會自動恢復為初始值,另外還有CAS來控制多執行緒同步,

 

【3】增量更新中哈希碼檢驗的設計

//里面的一致性哈希碼,本質上就是校驗資料
//如:服務器上全量塊存的是【ABCDEFG】,此時它的哈希碼便是全量塊存的資料的哈希值,增量塊存的是【FG】,
//而我們客戶端是【ABCD】,增量拉下來再合并,則為【ABCDFG】,得到的哈希值便會與全量哈希值不一致,代表了缺失一部分資料
//故檢驗不對就會全量拉取

 

【4】注冊表的結構說明(這個僅是記錄):

實體資訊存放的map,這是個兩層的ConcurrentHashMap<String, Map<String,Lease<InstanceInfo>>>,外層map的key是appName,也就是服務名,內層map的key是instanceId,也就是實體名
注冊表map資料示例如下:
{
    MICROSERVICE - PROVIDER - USER = {
        DESKTOP - 1 SLJLB7: microservice - provider - user: 8002 = com.netflix.eureka.lease.Lease @2cd36af6,
        DESKTOP - 1 SLJLB7: microservice - provider - user: 8001 = com.netflix.eureka.lease.Lease @600b7073
    },
    MICROSERVICE - PROVIDER - ORDER = {
        DESKTOP - 1 SLJLB7: microservice - provider - order: 8002 = com.netflix.eureka.lease.Lease @2cd36af6,
        DESKTOP - 1 SLJLB7: microservice - provider - order: 8001 = com.netflix.eureka.lease.Lease @600b7073
    }
}

 

Eureka服務端原始碼分析

【1】分析注解@EnableEurekaServer是如何開啟eurekaServer服務注冊功能

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {}

//注釋有說:這個注解是為了激活Eureka相關的配置類EurekaServerAutoConfiguration類
//但是卻是匯入了EurekaServerMarkerConfiguration類

【2】分析匯入的EurekaServerMarkerConfiguration類

//注釋說明:采用Marker的bean去激活EurekaServerAutoConfiguration類
//但實際上并沒有做什么,直接去EurekaServerAutoConfiguration類看他是怎么處理的
@Configuration(proxyBeanMethods = false)
public class EurekaServerMarkerConfiguration {
    @Bean
    public Marker eurekaServerMarkerBean() {
        return new Marker();
    }

    class Marker {}
}

【3】分析EurekaServerAutoConfiguration類

@Configuration(proxyBeanMethods = false)
@Import(EurekaServerInitializerConfiguration.class)
//當發現了這里,便明白了,這個配置類要生效是必須要有Marker類的存在
//而且EurekaServerAutoConfiguration類本身是基于SpringBoot的SPI機制,自動匯入的
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration implements WebMvcConfigurer {...}

【4】分析EurekaServerAutoConfiguration類中的方法

//初始化集群節點集合
@Bean
@ConditionalOnMissingBean
public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) {
    return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig,
            this.eurekaClientConfig, serverCodecs, this.applicationInfoManager,
            replicationClientAdditionalFilters);
}

//初始化EurekaServer的相關配置
@Configuration(proxyBeanMethods = false)
protected static class EurekaServerConfigBeanConfiguration {
    @Bean
    @ConditionalOnMissingBean
    public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) {
        EurekaServerConfigBean server = new EurekaServerConfigBean();
        if (clientConfig.shouldRegisterWithEureka()) {
            // Set a sensible default if we are supposed to replicate
            server.setRegistrySyncRetries(5);
        }
        return server;
    }
}

//初始化一些介面,用于獲取EurekaServer的資訊
@Bean
@ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true)
public EurekaController eurekaController() {
    return new EurekaController(this.applicationInfoManager);
}

//基于EurekaServer的配置,注冊表,集群節點集合,以及服務實體初始化EurekaServer背景關系
@Bean
public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) {
    return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager);
}

//初始化經過包裝的Eureka原生啟動類
@Bean
public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) {
    return new EurekaServerBootstrap(this.applicationInfoManager,
            this.eurekaClientConfig, this.eurekaServerConfig, registry,
            serverContext);
}

//初始化集群注冊表
@Bean
public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) {
    this.eurekaClient.getApplications(); // force initialization
    return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig,
            serverCodecs, this.eurekaClient,
            this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(),
            this.instanceRegistryProperties.getDefaultOpenForTrafficCount());
}

【5】分析EurekaServerAutoConfiguration類匯入的EurekaServerInitializerConfiguration

//因為實作了SmartLifecycle介面,會在初始化完成后根據isAutoStartup()的回傳值確認是否呼叫start()方法
//故查看EurekaServerInitializerConfiguration類#start()方法
@Override
public void start() {
    new Thread(() -> {
        try {
            //初始化EurekaServer,同時啟動Eureka Server
            eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
            log.info("Started Eureka Server");
            //發送Eureka注冊事件
            publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
            // 設定啟動的狀態為true
            EurekaServerInitializerConfiguration.this.running = true;
            // 發送Eureka Start事件,其他還有各種事件,我們可以監聽這種時間,然后做一些特定的業務需求
            publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
        }
        catch (Exception ex) {...}
    }).start();
}

//初始化EurekaServer的運行環境和背景關系
//EurekaServerBootstrap類#contextInitialized方法
public void contextInitialized(ServletContext context) {
    try {
        //初始化運行環境
        initEurekaEnvironment();
        //初始化背景關系
        initEurekaServerContext();

        context.setAttribute(EurekaServerContext.class.getName(), this.serverContext);
    }
    catch (Throwable e) {
        throw new RuntimeException(...);
    }
}

【6】分析初始化背景關系initEurekaServerContext方法做了什么【進行了服務同步,服務剔除的啟動】

protected void initEurekaServerContext() throws Exception {
    // For backward compatibility
    JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);
    XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(),
            XStream.PRIORITY_VERY_HIGH);

    if (isAws(this.applicationInfoManager.getInfo())) {
        this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig,
                this.eurekaClientConfig, this.registry, this.applicationInfoManager);
        this.awsBinder.start();
    }
    //初始化eureka server背景關系
    EurekaServerContextHolder.initialize(this.serverContext);

    log.info("Initialized server context");

    // Copy registry from neighboring eureka node
    // 從相鄰的eureka節點復制注冊表
    int registryCount = this.registry.syncUp();
    // 默認每30秒發送心跳,1分鐘就是2次
    // 修改eureka狀態為up
    // 同時,這里面會開啟一個定時任務,用于清理60秒沒有心跳的客戶端,自動下線
    // 根據屬性值可知是PeerAwareInstanceRegistry類
    this.registry.openForTraffic(this.applicationInfoManager, registryCount);

    // Register all monitoring statistics.
    EurekaMonitors.registerAllStats();
}

//回傳了一個EurekaServerContextHolder【其實就是將serverContext設定進入到里面當做屬性值】
public static synchronized void initialize(EurekaServerContext serverContext) {
    holder = new EurekaServerContextHolder(serverContext);
}

【7】服務同步的邏輯

//進行服務同步
@Override
public int syncUp() {
    // Copy entire entry from neighboring DS node
    int count = 0;
    //從組態檔中拿到注冊的節點
    for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
        if (i > 0) {
            try {
                Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
            } catch (InterruptedException e) { break; }
        }
        //呼叫節點的http請求獲取所有的服務實體
        Applications apps = eurekaClient.getApplications();
        for (Application app : apps.getRegisteredApplications()) {
            for (InstanceInfo instance : app.getInstances()) {
                try {
                    if (isRegisterable(instance)) {
                        //將其他節點的實體注冊到本節點
                        register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                        count++;
                    }
                } catch (Throwable t) {...}
            }
        }
    }
    return count;
}

【8】服務剔除的邏輯

//進行服務剔除
@Override
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
    // Renewals happen every 30 seconds and for a minute it should be a factor of 2.
    // 計算每分鐘最大續約數
    this.expectedNumberOfClientsSendingRenews = count;
    // 每分鐘最小續約數
    updateRenewsPerMinThreshold();
    
    this.startupTime = System.currentTimeMillis();
    if (count > 0) {
        this.peerInstancesTransferEmptyOnStartup = false;
    }
    DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName();
    boolean isAws = Name.Amazon == selfName;
    if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) {
        primeAwsReplicas(applicationInfoManager);
    }
    logger.info("Changing status to UP");
    // 設定實體的狀態為UP
    applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
    // 開啟定時任務,默認60秒執行一次,用于清理60秒之內沒有續約的實體
    super.postInit();
}

protected void updateRenewsPerMinThreshold() {
    this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews
            * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds())
            * serverConfig.getRenewalPercentThreshold());
}

protected void postInit() {
    renewsLastMin.start();
    if (evictionTaskRef.get() != null) {
        evictionTaskRef.get().cancel();
    }
    evictionTaskRef.set(new EvictionTask());
    //服務剔除任務
    //evictionIntervalTimerInMs = 60 * 1000,即每60s執行一次,且延遲60s
    evictionTimer.schedule(evictionTaskRef.get(),
            serverConfig.getEvictionIntervalTimerInMs(),
            serverConfig.getEvictionIntervalTimerInMs());
}
//EvictionTask類#run方法
@Override
public void run() {
    try {
        long compensationTimeMs = getCompensationTimeMs();
        logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs);
        evict(compensationTimeMs);
    } catch (Throwable e) {...}
}

//剔除邏輯
public void evict(long additionalLeaseMs) {
    logger.debug("Running the evict task");

    if (!isLeaseExpirationEnabled()) {
        logger.debug("DS: lease expiration is currently disabled.");
        return;
    }

    // We collect first all expired items, to evict them in random order. For large eviction sets,
    // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it,
    // the impact should be evenly distributed across all applications.
    List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
    for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
        Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
        if (leaseMap != null) {
            for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                Lease<InstanceInfo> lease = leaseEntry.getValue();
                if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                    expiredLeases.add(lease);
                }
            }
        }
    }

    // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for
    // triggering self-preservation. Without that we would wipe out full registry.
    int registrySize = (int) getLocalRegistrySize();
    int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
    int evictionLimit = registrySize - registrySizeThreshold;

    int toEvict = Math.min(expiredLeases.size(), evictionLimit);
    if (toEvict > 0) {
        logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit);

        Random random = new Random(System.currentTimeMillis());
        for (int i = 0; i < toEvict; i++) {
            // Pick a random item (Knuth shuffle algorithm)
            int next = i + random.nextInt(expiredLeases.size() - i);
            Collections.swap(expiredLeases, i, next);
            Lease<InstanceInfo> lease = expiredLeases.get(i);

            String appName = lease.getHolder().getAppName();
            String id = lease.getHolder().getId();
            EXPIRED.increment();
            logger.warn("DS: Registry: expired lease for {}/{}", appName, id);
            internalCancel(appName, id, false);
        }
    }
}

 

Eureka客戶端原始碼分析

【1】根據SpringBoot自動裝配先找出所有會呼叫的類

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\
org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\
org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\
org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration

org.springframework.cloud.bootstrap.BootstrapConfiguration=\
org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration

【2】找到對應的自動裝配類EurekaClientAutoConfiguration類

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@Import(DiscoveryClientOptionalArgsConfiguration.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
        CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
@AutoConfigureAfter(name = {
        "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
        "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
        "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" })
public class EurekaClientAutoConfiguration {
    //初始化EurekaClient的相關配置
    @Bean
    @ConditionalOnMissingBean(value = EurekaClientConfig.class,
            search = SearchStrategy.CURRENT)
    public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) {
        EurekaClientConfigBean client = new EurekaClientConfigBean();
        if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) {
            // We don't register during bootstrap by default, but there will be another
            // chance later.
            client.setRegisterWithEureka(false);
        }
        return client;
    }

    //Client啟動時的自動注冊Bean
    @Bean
    @ConditionalOnBean(AutoServiceRegistrationProperties.class)
    @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
    public EurekaAutoServiceRegistration eurekaAutoServiceRegistration(
            ApplicationContext context, EurekaServiceRegistry registry,
            EurekaRegistration registration) {
        return new EurekaAutoServiceRegistration(context, registry, registration);
    }

    //EurekaClient配置類
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnMissingRefreshScope
    protected static class EurekaClientConfiguration {

        @Autowired
        private ApplicationContext context;

        @Autowired
        private AbstractDiscoveryClientOptionalArgs<?> optionalArgs;

        @Bean(destroyMethod = "shutdown")
        @ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT)
        public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) {
            return new CloudEurekaClient(manager, config, this.optionalArgs, this.context);
        }

        @Bean
        @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT)
        public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) {
            InstanceInfo instanceInfo = new InstanceInfoFactory().create(config);
            return new ApplicationInfoManager(config, instanceInfo);
        }

        @Bean
        @ConditionalOnBean(AutoServiceRegistrationProperties.class)
        @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true)
        public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) {
            return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager)
                    .with(eurekaClient).with(healthCheckHandler).build();
        }

    }
....
}

【2.1】分析注解@AutoConfigureAfter匯入的EurekaDiscoveryClientConfiguration類做了什么

@Configuration(proxyBeanMethods = false)
@EnableConfigurationProperties
@ConditionalOnClass(EurekaClientConfig.class)
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)
@ConditionalOnDiscoveryEnabled
@ConditionalOnBlockingDiscoveryEnabled
public class EurekaDiscoveryClientConfiguration {

    //基于EurekaClientAutoConfiguration的啟動標志
    @Deprecated
    @Bean
    public Marker eurekaDiscoverClientMarker() {
        return new Marker();
    }

    //將EurekaClient包裝成EurekaDiscoveryClient
    @Bean
    @ConditionalOnMissingBean
    public EurekaDiscoveryClient discoveryClient(EurekaClient client,
            EurekaClientConfig clientConfig) {
        return new EurekaDiscoveryClient(client, clientConfig);
    }

    //心跳檢測的處理配置
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnProperty(value = "eureka.client.healthcheck.enabled",matchIfMissing = false)
    protected static class EurekaHealthCheckHandlerConfiguration {

        @Autowired(required = false)
        private StatusAggregator statusAggregator = new SimpleStatusAggregator();

        @Bean
        @ConditionalOnMissingBean(HealthCheckHandler.class)
        public EurekaHealthCheckHandler eurekaHealthCheckHandler() {
            return new EurekaHealthCheckHandler(this.statusAggregator);
        }

    }

    @Deprecated
    class Marker {

    }

    //定義了Client配置重刷的監聽器
    @Configuration(proxyBeanMethods = false)
    @ConditionalOnClass(RefreshScopeRefreshedEvent.class)
    protected static class EurekaClientConfigurationRefresher implements ApplicationListener<RefreshScopeRefreshedEvent> {
        ....
    }

}

//看得出來包裝也只是將配置和客戶端放在了一起
public EurekaDiscoveryClient(EurekaClient eurekaClient,
        EurekaClientConfig clientConfig) {
    this.clientConfig = clientConfig;
    this.eurekaClient = eurekaClient;
}

 

【3】分析EurekaClient的相關配置EurekaClientConfigBean類

//僅列舉了部分
@ConfigurationProperties(EurekaClientConfigBean.PREFIX)
public class EurekaClientConfigBean implements EurekaClientConfig, Ordered {
    //客戶端配置前綴
    public static final String PREFIX = "eureka.client";
    //public static final String DEFAULT_PREFIX = "/eureka";
    //默認的注冊地址
    public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX + "/";
    //默認域
    public static final String DEFAULT_ZONE = "defaultZone";

    private static final int MINUTES = 60;

    //多長時間從注冊中心服務端拉取一次服務資訊,單位秒;這個就是主動拉取注冊中心上所有服務的實體資訊
    private int registryFetchIntervalSeconds = 30;
    //多長時間復制實體變化到eureka服務端,單位秒;這個配置是復制實體資訊到注冊中心
    private int instanceInfoReplicationIntervalSeconds = 30;
    //實體初始化復制資訊到eureka服務端的間隔時間,所以可以看到,其實實體的初始化階段不是立即復制實體資訊到注冊中心的,單位秒
    private int initialInstanceInfoReplicationIntervalSeconds = 40;
    //eureka服務端的變化,多長時間,客戶端會獲取一次eureka服務的資訊
    private int eurekaServiceUrlPollIntervalSeconds = 5 * MINUTES;
    //eureka server的代理埠
    private String proxyPort;
    //eureka server的代理host name
    private String proxyHost;
    //賬號
    private String proxyUserName;
    //密碼
    private String proxyPassword;
    //從server讀取所需的超時時間
    private int eurekaServerReadTimeoutSeconds = 8;
    //連接server的超時時間
    private int eurekaServerConnectTimeoutSeconds = 5;
    //被允許連接到所有server host的總連接數
    private int eurekaServerTotalConnections = 200;
    // 被允許連接到每一個server host的總連接數
    private int eurekaServerTotalConnectionsPerHost = 50;
    //連接到server的http連接的空閑超時時間,超時會被清理掉
    private int eurekaConnectionIdleTimeoutSeconds = 30;
    //heartbeatExecutor 心跳的執行緒數
    private int heartbeatExecutorThreadPoolSize = 2;
    //客戶端初始化階段強制注冊,默認關閉
    private boolean shouldEnforceRegistrationAtInit = false;

...
}

 

【4】分析EurekaClientConfiguration配置類里面生成的EurekaClient的Bean

//CloudEurekaClient類【繼承DiscoveryClient類】#構造方法
public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) {
    super(applicationInfoManager, config, args);
    this.applicationInfoManager = applicationInfoManager;
    this.publisher = publisher;
    this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class,
            "eurekaTransport");
    ReflectionUtils.makeAccessible(this.eurekaTransportField);
}

//DiscoveryClient類【繼承EurekaClient(原生的EurekaClient)】#構造方法
public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) {
    this(applicationInfoManager, config, args, ResolverUtils::randomize);
}

public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) {
    //主要是這個this指向,畢竟里面的都是方法傳參
    this(applicationInfoManager, config, args, new Provider<BackupRegistry>() {
        private volatile BackupRegistry backupRegistryInstance;

        @Override
        public synchronized BackupRegistry get() {
            if (backupRegistryInstance == null) {
                String backupRegistryClassName = config.getBackupRegistryImpl();
                if (null != backupRegistryClassName) {
                    try {
                        backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance();
                        logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass());
                    } catch (InstantiationException e) {..} catch (IllegalAccessException e) {..} catch (ClassNotFoundException e) {...}
                }

                if (backupRegistryInstance == null) {
                    logger.warn("Using default backup registry implementation which does not do anything.");
                    backupRegistryInstance = new NotImplementedRegistryImpl();
                }
            }

            return backupRegistryInstance;
        }
    }, randomizer);
}

 

【5】分析DiscoveryClient的構造方法

@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) {
    if (args != null) {
        this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
        this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
        this.eventListeners.addAll(args.getEventListeners());
        this.preRegistrationHandler = args.preRegistrationHandler;
    } else {
        this.healthCheckCallbackProvider = null;
        this.healthCheckHandlerProvider = null;
        this.preRegistrationHandler = null;
    }
    
    this.applicationInfoManager = applicationInfoManager;
    InstanceInfo myInfo = applicationInfoManager.getInfo();

    clientConfig = config;
    staticClientConfig = clientConfig;
    transportConfig = config.getTransportConfig();
    instanceInfo = myInfo;
    if (myInfo != null) {
        appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
    } else {...}

    this.backupRegistryProvider = backupRegistryProvider;
    this.endpointRandomizer = endpointRandomizer;
    this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
    localRegionApps.set(new Applications());

    fetchRegistryGeneration = new AtomicLong(0);

    remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
    remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));

    if (config.shouldFetchRegistry()) {
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    if (config.shouldRegisterWithEureka()) {
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
    } else {
        this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
    }

    //從這里開始初始化Eureka Client
    if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) {
        logger.info("Client configured to neither register nor query for data.");
        scheduler = null;
        heartbeatExecutor = null;
        cacheRefreshExecutor = null;
        eurekaTransport = null;
        instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion());

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
        logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}",
                initTimestampMs, this.getApplications().size());

        return;  // no need to setup up an network tasks and we are done
    }

    try {
        // default size of 2 - 1 each for heartbeat and cacheRefresh
        scheduler = Executors.newScheduledThreadPool(2,
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-%d")
                        .setDaemon(true)
                        .build());
        //心跳的執行緒池
        heartbeatExecutor = new ThreadPoolExecutor(
                1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff
        //快取重刷的執行緒池
        cacheRefreshExecutor = new ThreadPoolExecutor(
                1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactoryBuilder()
                        .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
                        .setDaemon(true)
                        .build()
        );  // use direct handoff

        eurekaTransport = new EurekaTransport();
        scheduleServerEndpointTask(eurekaTransport, args);

        AzToRegionMapper azToRegionMapper;
        if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
            azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
        } else {
            azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
        }
        if (null != remoteRegionsToFetch.get()) {
            azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
        }
        instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
    } catch (Throwable e) {
        throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
    }

    if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
        fetchRegistryFromBackup();
    }

    // call and execute the pre registration handler before all background tasks (inc registration) is started
    if (this.preRegistrationHandler != null) {
        this.preRegistrationHandler.beforeRegistration();
    }

    if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
        try {
            if (!register() ) {
                throw new IllegalStateException("Registration error at startup. Invalid server response.");
            }
        } catch (Throwable th) {...}
    }

    // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch
    //最核心代碼,初始化定時任務
    initScheduledTasks();

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {...}

    DiscoveryManager.getInstance().setDiscoveryClient(this);
    DiscoveryManager.getInstance().setEurekaClientConfig(config);

    initTimestampMs = System.currentTimeMillis();
}

 

【6】核心邏輯initScheduledTasks初始化定時任務,是做了什么

/**
 * Initializes all scheduled tasks.
 */
private void initScheduledTasks() {
    //獲取服務注冊串列資訊
    if (clientConfig.shouldFetchRegistry()) {
        //服務注冊串列更新的周期時間
        //默認是30
        int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
        int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
        //定時更新服務注冊串列
        //這里的延時任務明顯是只呼叫一次,具體在分析他的任務的run方法
        scheduler.schedule(
                new TimedSupervisorTask(
                        "cacheRefresh",
                        scheduler,
                        cacheRefreshExecutor,
                        registryFetchIntervalSeconds,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new CacheRefreshThread()   //該執行緒執行更新的具體邏輯
                ),
                registryFetchIntervalSeconds, TimeUnit.SECONDS);
    }

    if (clientConfig.shouldRegisterWithEureka()) {
        //服務續約的周期時間
        int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
        int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
        //應用啟動可見此日志,內容是:Starting heartbeat executor: renew interval is: 30
        logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

        // Heartbeat timer
        // 服務定時續約
        scheduler.schedule(
                new TimedSupervisorTask(
                        "heartbeat",
                        scheduler,
                        heartbeatExecutor,
                        renewalIntervalInSecs,
                        TimeUnit.SECONDS,
                        expBackOffBound,
                        new HeartbeatThread()  //該執行緒執行續約的具體邏輯
                ),
                renewalIntervalInSecs, TimeUnit.SECONDS);

        //這個Runable中含有服務注冊的邏輯
        instanceInfoReplicator = new InstanceInfoReplicator(
                this,
                instanceInfo,
                clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                2); // burstSize

        statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
            @Override
            public String getId() {
                return "statusChangeListener";
            }

            @Override
            public void notify(StatusChangeEvent statusChangeEvent) {
                if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                    // log at warn level if DOWN was involved
                    logger.warn("Saw local status change event {}", statusChangeEvent);
                } else {
                    logger.info("Saw local status change event {}", statusChangeEvent);
                }
                instanceInfoReplicator.onDemandUpdate();
            }
        };

        if (clientConfig.shouldOnDemandUpdateStatusChange()) {
            applicationInfoManager.registerStatusChangeListener(statusChangeListener);
        }
        //服務注冊
        instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
    } else {
        logger.info("Not registering with Eureka server per configuration");
    }
}

 

【6.1】定時任務TimedSupervisorTask類的設計

//TimedSupervisorTask類#run方法
//這里存在一個設計的亮點
public class TimedSupervisorTask extends TimerTask {
    ...

    public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor,
                               int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) {
        this.scheduler = scheduler;
        this.executor = executor;
        this.timeoutMillis = timeUnit.toMillis(timeout);
        this.task = task;  //可以看出任務還是需要根據傳入來的
        this.delay = new AtomicLong(timeoutMillis);
        this.maxDelay = timeoutMillis * expBackOffBound;
        // Initialize the counters and register.
        successCounter = Monitors.newCounter("success");
        timeoutCounter = Monitors.newCounter("timeouts");
        rejectedCounter = Monitors.newCounter("rejectedExecutions");
        throwableCounter = Monitors.newCounter("throwables");
        threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build());
        Monitors.registerObject(name, this);
    }

    @Override
    public void run() {
        Future<?> future = null;
        try {
            future = executor.submit(task);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            //設定了超時時間
            future.get(timeoutMillis, TimeUnit.MILLISECONDS);  // block until done or timeout
            //出現任務不超時的情況又會將延遲時間重置(這里主要是配合下面捕捉例外的超時翻倍情況)
            delay.set(timeoutMillis);
            threadPoolLevelGauge.set((long) executor.getActiveCount());
            successCounter.increment();
        } catch (TimeoutException e) {
            logger.warn("task supervisor timed out", e);
            //出現超時的記錄
            timeoutCounter.increment();
            //將超時時間翻倍(在最大的任務時間內),主動延遲
            long currentDelay = delay.get();
            long newDelay = Math.min(maxDelay, currentDelay * 2);
            //設定為最新的值,考慮到多執行緒,所以用了CAS
            delay.compareAndSet(currentDelay, newDelay);

        } catch (RejectedExecutionException e) {
            //一旦執行緒池的阻塞佇列中放滿了待處理任務,觸發了拒絕策略,就會將調度器停掉
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, reject the task", e);
            } else {
                logger.warn("task supervisor rejected the task", e);
            }
            //被拒絕的次數
            rejectedCounter.increment();
        } catch (Throwable e) {
            if (executor.isShutdown() || scheduler.isShutdown()) {
                logger.warn("task supervisor shutting down, can't accept the task");
            } else {
                logger.warn("task supervisor threw an exception", e);
            }

            throwableCounter.increment();
        } finally {
            if (future != null) {
                //這里任務要么執行完畢,要么發生例外,都用cancel方法來清理任務;
                future.cancel(true);
            }
            //只要調度器沒有停止,就再指定等待時間之后在執行一次同樣的任務
            //任務里面又塞入這個任務
            if (!scheduler.isShutdown()) {
                //假設外部呼叫時傳入的超時時間為30秒(構造方法的入參timeout),最大間隔時間為50秒(構造方法的入參expBackOffBound)
                //如果最近一次任務沒有超時,那么就在30秒后開始新任務,
                //如果最近一次任務超時了,那么就在50秒后開始新任務(例外處理中有個乘以二的操作,乘以二后的60秒超過了最大間隔50秒)
                scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS);
            }
        }
    }
}

 

【6.2】分析更新服務注冊串列任務 CacheRefreshThread【獲取服務邏輯】

//DiscoveryClient類的內置類
class CacheRefreshThread implements Runnable {
    public void run() {
        refreshRegistry();
    }
}

//DiscoveryClient類#refreshRegistry方法
@VisibleForTesting
void refreshRegistry() {
    try {
        boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

        boolean remoteRegionsModified = false;
        // This makes sure that a dynamic change to remote regions to fetch is honored.
        String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
        //不做aws環境的配置這個if邏輯不會執行
        if (null != latestRemoteRegions) {
            String currentRemoteRegions = remoteRegionsToFetch.get();
            if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                    if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                        String[] remoteRegions = latestRemoteRegions.split(",");
                        remoteRegionsRef.set(remoteRegions);
                        instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                        remoteRegionsModified = true;
                    } else {....}
                }
            } else {
                // Just refresh mapping to reflect any DNS/Property change
                instanceRegionChecker.getAzToRegionMapper().refreshMapping();
            }
        }
        //獲取注冊資訊方法
        boolean success = fetchRegistry(remoteRegionsModified);
        if (success) {
            registrySize = localRegionApps.get().size();
            lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
        }

        if (logger.isDebugEnabled()) {...省略日志內容...}
    } catch (Throwable e) {...}
}

private boolean fetchRegistry(boolean forceFullRegistryFetch) {
    Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

    try {
        // 如果增量被禁用,或者是第一次,那么獲取所有應用程式
        // 取出本地快取之前獲取的服務串列資訊
        Applications applications = getApplications();
        //是否禁用增量更新
        if (clientConfig.shouldDisableDelta()
                || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                || forceFullRegistryFetch
                //是否第一次拉取
                || (applications == null)
                || (applications.getRegisteredApplications().size() == 0)
                || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
        {
            //全量獲取
            getAndStoreFullRegistry();
        } else {
            //增量獲取
            getAndUpdateDelta(applications);
        }
        //更新本地快取
        applications.setAppsHashCode(applications.getReconcileHashCode());
        logTotalInstances();
    } catch (Throwable e) {
        return false;
    } finally {
        if (tracer != null) {
            tracer.stop();
        }
    }

    // Notify about cache refresh before updating the instance remote status
    //將本地快取更新的事件廣播給所有已注冊的監聽器,注意該方法已被CloudEurekaClient類重寫
    onCacheRefreshed();
    // Update remote status based on refreshed data held in the cache
    //檢查剛剛更新的快取中,有來自Eureka server的服務串列,其中包含了當前應用的狀態,
    //當前實體的成員變數lastRemoteInstanceStatus,記錄的是最后一次更新的當前應用狀態,
    //上述兩種狀態在updateInstanceRemoteStatus方法中作比較 ,如果不一致,就更新lastRemoteInstanceStatus,并且廣播對應的事件
    updateInstanceRemoteStatus();
    // registry was fetched successfully, so return true
    return true;
}

@Override
public Applications getApplications() {
    return localRegionApps.get();
}

 

【6.2.1】分析全量更新

private void getAndStoreFullRegistry() throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications apps = null;
    //由于并沒有配置特別關注的region資訊,因此會呼叫eurekaTransport.queryClient.getApplications方法從服務端獲取服務串列
    EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
            ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
            : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        //回傳物件就是服務串列
        apps = httpResponse.getEntity();
    }
    logger.info("The response status is {}", httpResponse.getStatusCode());

    if (apps == null) {...} 
    else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        //考慮到多執行緒同步,只有CAS成功的執行緒,才會把自己從Eureka server獲取的資料來替換本地快取
        localRegionApps.set(this.filterAndShuffle(apps));
    } else {...}
}

//EurekaHttpClientDecorator類#getApplications方法
@Override
public EurekaHttpResponse<Applications> getApplications(final String... regions) {
    //這里面涉及到配置是否重試
    return execute(new RequestExecutor<Applications>() {
        @Override
        public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) {
            //呼叫AbstractJerseyEurekaHttpClient類
            return delegate.getApplications(regions);
        }

        @Override
        public RequestType getRequestType() {
            return RequestType.GetApplications;
        }
    });
}

@Override
public EurekaHttpResponse<Applications> getApplications(String... regions) {
    //取增量資料的path是"apps/delta"
    return getApplicationsInternal("apps/", regions);
}

//具體的請求回應處理都在此方法中
private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
    ClientResponse response = null;
    String regionsParamValue = null;
    try {
        //jersey、resource這些關鍵詞都預示著這是個restful請求
        WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
        if (regions != null && regions.length > 0) {
            regionsParamValue = StringUtil.join(regions);
            webResource = webResource.queryParam("regions", regionsParamValue);
        }
        Builder requestBuilder = webResource.getRequestBuilder();
        addExtraHeaders(requestBuilder);
        //發起網路請求,將回應封裝成ClientResponse實體
        response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);

        Applications applications = null;
        if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) {
            //取得全部應用資訊
            applications = response.getEntity(Applications.class);
        }
        return anEurekaHttpResponse(response.getStatus(), Applications.class)
                .headers(headersOf(response))
                .entity(applications)
                .build();
    } finally {
        if (response != null) {
            response.close();
        }
    }
}

//總結:獲取全量資料,是通過jersey-client庫的API向Eureka server發起restful請求http://localhost:8761/eureka/apps實作的,并將回應的服務串列資料放在一個成員變數中作為本地快取

 

【6.2.2】分析增量更新

//分析增量更新
//里面的一致性哈希碼,本質上就是校驗資料
//如:服務器上全量塊存的是【ABCDEFG】,此時它的哈希碼便是全量塊存的資料的哈希值,增量塊存的是【FG】,
//而我們客戶端是【ABCD】,增量拉下來再合并,則為【ABCDFG】,得到的哈希值便會與全量哈希值不一致,代表了缺失一部分資料
//故檢驗不對就會全量拉取
private void getAndUpdateDelta(Applications applications) throws Throwable {
    long currentUpdateGeneration = fetchRegistryGeneration.get();

    Applications delta = null;
    //增量資訊是通過eurekaTransport.queryClient.getDelta方法完成的
    EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
    if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) {
        //delta中保存了Eureka server回傳的增量更新
        delta = httpResponse.getEntity();
    }
    //如果沒有
    if (delta == null) {
        //如果增量資訊為空,就直接發起一次全量更新
        getAndStoreFullRegistry();
    } 
    //考慮到多執行緒同步問題,這里通過CAS來確保請求發起到現在是執行緒安全的,
    //如果這期間fetchRegistryGeneration變了,就表示其他執行緒也做了類似操作,因此放棄本次回應的資料
    else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) {
        logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode());
        String reconcileHashCode = "";
        if (fetchRegistryUpdateLock.tryLock()) {
            try {
                //用Eureka回傳的增量資料和本地資料做合并操作
                updateDelta(delta);
                //用合并了增量資料之后的本地資料來生成一致性哈希碼
                reconcileHashCode = getReconcileHashCode(applications);
            } finally {
                fetchRegistryUpdateLock.unlock();
            }
        } else {...}
        //Eureka server在回傳增量更新資料時,也會回傳服務端的一致性哈希碼,
        //理論上每次本地快取資料經歷了多次增量更新后,計算出的一致性哈希碼應該是和服務端一致的,
        //如果發現不一致,就證明本地快取的服務串列資訊和Eureka server不一致了,需要做一次全量更新
        if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) {
            //一致性哈希碼不同,就在reconcileAndLogDifference方法中做全量更新
            reconcileAndLogDifference(delta, reconcileHashCode);  // this makes a remoteCall
        }
    } else {...}
}

//updateDelta方法將增量更新資料和本地資料做合并
private void updateDelta(Applications delta) {
    int deltaCount = 0;
    //遍歷所有服務
    for (Application app : delta.getRegisteredApplications()) {
        //遍歷當前服務的所有實體
        for (InstanceInfo instance : app.getInstances()) {
            //取出快取的所有服務串列,用于合并
            Applications applications = getApplications();
            String instanceRegion = instanceRegionChecker.getInstanceRegion(instance);
            //判斷正在處理的實體和當前應用是否在同一個region
            if (!instanceRegionChecker.isLocalRegion(instanceRegion)) {
                //如果不是同一個region,接下來合并的資料就換成專門為其他region準備的快取
                Applications remoteApps = remoteRegionVsApps.get(instanceRegion);
                if (null == remoteApps) {
                    remoteApps = new Applications();
                    remoteRegionVsApps.put(instanceRegion, remoteApps);
                }
                applications = remoteApps;
            }

            ++deltaCount;
             //對新增的實體的處理
            if (ActionType.ADDED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion);
                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);
            } 
             //對修改實體的處理
            else if (ActionType.MODIFIED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp == null) {
                    applications.addApplication(app);
                }
                logger.debug("Modified instance {} to the existing apps ", instance.getId());

                applications.getRegisteredApplications(instance.getAppName()).addInstance(instance);

            } 
            //對洗掉實體的處理
            else if (ActionType.DELETED.equals(instance.getActionType())) {
                Application existingApp = applications.getRegisteredApplications(instance.getAppName());
                if (existingApp != null) {
                    logger.debug("Deleted instance {} to the existing apps ", instance.getId());
                    existingApp.removeInstance(instance);
                    /*
                     * We find all instance list from application(The status of instance status is not only the status is UP but also other status)
                     * if instance list is empty, we remove the application.
                     */
                    if (existingApp.getInstancesAsIsFromEureka().isEmpty()) {
                        applications.removeApplication(existingApp);
                    }
                }
            }
        }
    }
    logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount);

    getApplications().setVersion(delta.getVersion());
    //整理資料,使得后續使用程序中,這些應用的實體總是以相同順序回傳
    getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    //和當前應用不在同一個region的應用,其實體資料也要整理
    for (Applications applications : remoteRegionVsApps.values()) {
        applications.setVersion(delta.getVersion());
        applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances());
    }
}

 

【6.3】分析服務定時續約任務 HeartbeatThread(也就是心跳機制)

private class HeartbeatThread implements Runnable {

    public void run() {
        if (renew()) {
            lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
        }
    }
}

boolean renew() {
    EurekaHttpResponse<InstanceInfo> httpResponse;
    try {
        //發送心跳請求
        httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
        logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
        if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
            REREGISTER_COUNTER.increment();
            logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
            long timestamp = instanceInfo.setIsDirtyWithTime();
            boolean success = register();
            if (success) {
                instanceInfo.unsetIsDirty(timestamp);
            }
            return success;
        }
        return httpResponse.getStatusCode() == Status.OK.getStatusCode();
    } catch (Throwable e) {
        logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
        return false;
    }
}

 

【7】分析服務注冊的instanceInfoReplicator.start方法

public void start(int initialDelayMs) {
    if (started.compareAndSet(false, true)) {
        instanceInfo.setIsDirty();  // for initial register
        Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

//InstanceInfoReplicator類#run方法
public void run() {
    try {
        discoveryClient.refreshInstanceInfo();

        Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
        if (dirtyTimestamp != null) {
            //服務注冊
            discoveryClient.register();
            instanceInfo.unsetIsDirty(dirtyTimestamp);
        }
    } catch (Throwable t) {
        logger.warn("There was a problem with the instance info replicator", t);
    } finally {
        Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
        scheduledPeriodicRef.set(next);
    }
}

boolean register() throws Throwable {
    logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
    EurekaHttpResponse<Void> httpResponse;
    try {
        //發起注冊請求
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
}

 

【8】Eureka Server服務端Jersey介面部分分析

【8.1】服務端Jersey介面處理類ApplicationResource

@Produces({"application/xml", "application/json"})
public class ApplicationResource {
    ...
    //注冊一個實體的資訊
    @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        // 引數校驗,不符合驗證規則的,回傳400狀態碼,
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
        // 重點在這里,進行注冊
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

}

 

【8.1.1】注冊方法分析

@Override
public void register(final InstanceInfo info, final boolean isReplication) {
    int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
    if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
        leaseDuration = info.getLeaseInfo().getDurationInSecs();
    }
    super.register(info, leaseDuration, isReplication);
    replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
}

//AbstractInstanceRegistry類#register方法
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
    try {
        // 上只讀鎖
        read.lock();
        // 從本地MAP里面獲取當前實體的資訊
        //注冊表的結構
        Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
        // 增加注冊次數到監控資訊里面去,
        REGISTER.increment(isReplication);
        if (gMap == null) {
            // 如果第一次進來,那么gMap為空,則創建一個ConcurrentHashMap放入到registry里面去
            final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
            // putIfAbsent方法主要是在向ConcurrentHashMap中添加鍵—值對的時候,它會先判斷該鍵值對是否已經存在,
            // 如果不存在(新的entry),那么會向map中添加該鍵值對,并回傳null,
            // 如果已經存在,那么不會覆寫已有的值,直接回傳已經存在的值,
            gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
            if (gMap == null) {
                // 表明map中確實不存在,則設定gMap為最新創建的那個
                gMap = gNewMap;
            }
        }
        // 從MAP中查詢已經存在的Lease資訊 (比如第二次來)
        Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
        // 當Lease的物件不為空時,
        if (existingLease != null && (existingLease.getHolder() != null)) {
            // 當instance已經存在是,和客戶端的instance的資訊做比較,時間最新的那個,為有效instance資訊
            Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
            Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();

            if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                registrant = existingLease.getHolder();
            }
        } else {
            // 這里只有當existinglease不存在時,才會進來, 像那種恢復心跳,資訊過期的,都不會進入這里,
            // Eureka‐Server的自我保護機制做的操作,為每分鐘最大續約數+2 ,同時重新計算每分鐘最小續約數
            synchronized (lock) {
                if (this.expectedNumberOfClientsSendingRenews > 0) {
                    // Since the client wants to register it, increase the number of clients sending renews
                    this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1;
                    updateRenewsPerMinThreshold();
                }
            }
        }
        // 構建一個最新的Lease資訊
        Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
        if (existingLease != null) {
            // 當原來存在Lease的資訊時,設定他的serviceUpTimestamp, 保證服務開啟的時間一直是第一次的那個
            lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
        }
        // 放入本地Map中
        gMap.put(registrant.getId(), lease);
        // 添加到最近的注冊佇列里面去,以時間戳作為Key, 名稱作為value,主要是為了運維界面的統計資料,
        synchronized (recentRegisteredQueue) {
            recentRegisteredQueue.add(new Pair<Long, String>(
                    System.currentTimeMillis(),
                    registrant.getAppName() + "(" + registrant.getId() + ")"));
        }
        // 分析instanceStatus
        if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
            logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                            + "overrides", registrant.getOverriddenStatus(), registrant.getId());
            if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
            }
        }
        InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
        if (overriddenStatusFromMap != null) {
            logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
            registrant.setOverriddenStatus(overriddenStatusFromMap);
        }

        // Set the status based on the overridden status rules
        InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
        registrant.setStatusWithoutDirty(overriddenInstanceStatus);

        // 得到instanceStatus,判斷是否是UP狀態,
        if (InstanceStatus.UP.equals(registrant.getStatus())) {
            lease.serviceUp();
        }
        // 設定注冊型別為添加
        registrant.setActionType(ActionType.ADDED);
        // 租約變更記錄佇列,記錄了實體的每次變化, 用于注冊資訊的增量獲取、
        recentlyChangedQueue.add(new RecentlyChangedItem(lease));
        registrant.setLastUpdatedTimestamp();
         // 清理快取 ,傳入的引數為key
        invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
    } finally {
        read.unlock();
    }
}

 

【8.1.1】分析Lease結構

public class Lease<T> {

    enum Action {
        Register, Cancel, Renew
    };
    //租約過期的時間常量,默認90秒,也就說90秒沒有心跳過來,那么這邊將會自動剔除該節點
    public static final int DEFAULT_DURATION_IN_SECS = 90;
    這個租約是屬于誰的, 目前占用這個屬性的是
    private T holder;
    //租約是啥時候過期的,當服務下線的時候,會過來更新這個時間戳registrationTimestamp : 租約的注冊時間
    private long evictionTimestamp;
    private long registrationTimestamp;
    //服務啟動時間 ,當客戶端在注冊的時候,instanceInfo的status 為UP的時候,則更新這個時間戳
    private long serviceUpTimestamp;
    //最后更新時間,每次續約的時候,都會更新這個時間戳,在判斷實體是否過期時,需要用到這個屬性,
    private volatile long lastUpdateTimestamp;
    //過期時間,毫秒單位
    private long duration;

    public Lease(T r, int durationInSecs) {
        holder = r;
        registrationTimestamp = System.currentTimeMillis();
        lastUpdateTimestamp = registrationTimestamp;
        duration = (durationInSecs * 1000);

    }

    //更新的時候設定過期時間為當前時間+90S
    public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;

    }

    public void cancel() {
        if (evictionTimestamp <= 0) {
            evictionTimestamp = System.currentTimeMillis();
        }
    }

    public void serviceUp() {
        if (serviceUpTimestamp == 0) {
            serviceUpTimestamp = System.currentTimeMillis();
        }
    }

    public void setServiceUpTimestamp(long serviceUpTimestamp) {
        this.serviceUpTimestamp = serviceUpTimestamp;
    }

    public boolean isExpired() {
        return isExpired(0l);
    }
    //這里面存在的問題是過期時間+90S
    //實際上也就是在更新時候的180s之后才算過期
    public boolean isExpired(long additionalLeaseMs) {
        return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }

    public long getRegistrationTimestamp() {
        return registrationTimestamp;
    }

    public long getLastRenewalTimestamp() {
        return lastUpdateTimestamp;
    }

    public long getEvictionTimestamp() {
        return evictionTimestamp;
    }

    public long getServiceUpTimestamp() {
        return serviceUpTimestamp;
    }

    public T getHolder() {
        return holder;
    }

}

 

【8.2】客戶端Jersey介面處理類ApplicationsResource

@Path("/{version}/apps")
@Produces({"application/xml", "application/json"})
public class ApplicationsResource {
    ...
    private final EurekaServerConfig serverConfig;
    private final PeerAwareInstanceRegistry registry;
    private final ResponseCache responseCache;

    @Inject
    ApplicationsResource(EurekaServerContext eurekaServer) {
        this.serverConfig = eurekaServer.getServerConfig();
        this.registry = eurekaServer.getRegistry();
        this.responseCache = registry.getResponseCache();
    }

    public ApplicationsResource() {
        this(EurekaServerContextHolder.getInstance().getServerContext());
    }

    //獲取關于特定{@link com.netflix.discovery.shared.Application}的資訊,
    @Path("{appId}")
    public ApplicationResource getApplicationResource(
            @PathParam("version") String version,
            @PathParam("appId") String appId) {
        CurrentRequestVersion.set(Version.toEnum(version));
        return new ApplicationResource(appId, serverConfig, registry);
    }

    //獲取關于所有{@link com.netflix.discovery.shared.Applications}的資訊,
    @GET
    public Response getContainers(@PathParam("version") String version,
                                  @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                                  @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
                                  @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                                  @Context UriInfo uriInfo,
                                  @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
        }

        // Check if the server allows the access to the registry. The server can
        // restrict access if it is not
        // ready to serve traffic depending on various reasons.
        if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
            return Response.status(Status.FORBIDDEN).build();
        }
        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }
        //獲取服務實體對應的快取key
        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        Response response;
        //是否壓縮
        if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            response = Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            //從快取里獲取服務實體注冊資訊
            response = Response.ok(responseCache.get(cacheKey))
                    .build();
        }
        return response;
    }

    //在{@link com.netflix.discovery.shared.Applications}中獲取關于所有增量更改的資訊,
    @Path("delta")
    @GET
    public Response getContainerDifferential(
            @PathParam("version") String version,
            @HeaderParam(HEADER_ACCEPT) String acceptHeader,
            @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
            @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
            @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

        boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();

        // If the delta flag is disabled in discovery or if the lease expiration
        // has been disabled, redirect clients to get all instances
        if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
            return Response.status(Status.FORBIDDEN).build();
        }

        String[] regions = null;
        if (!isRemoteRegionRequested) {
            EurekaMonitors.GET_ALL_DELTA.increment();
        } else {
            regions = regionsStr.toLowerCase().split(",");
            Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
            EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
        }

        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        String returnMediaType = MediaType.APPLICATION_JSON;
        if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
            keyType = Key.KeyType.XML;
            returnMediaType = MediaType.APPLICATION_XML;
        }

        Key cacheKey = new Key(Key.EntityType.Application,
                ResponseCacheImpl.ALL_APPS_DELTA,
                keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
        );

        if (acceptEncoding != null
                && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
            return Response.ok(responseCache.getGZIP(cacheKey))
                    .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                    .header(HEADER_CONTENT_TYPE, returnMediaType)
                    .build();
        } else {
            return Response.ok(responseCache.get(cacheKey))
                    .build();
        }
    }
}

 

【8.2.1】ApplicationsResource類的getContainers方法分析

//獲取關于所有{@link com.netflix.discovery.shared.Applications}的資訊,
@GET
public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader,
                              @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
                              @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {

    boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
    String[] regions = null;
    if (!isRemoteRegionRequested) {
        EurekaMonitors.GET_ALL.increment();
    } else {
        regions = regionsStr.toLowerCase().split(",");
        Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
        EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
    }

    // Check if the server allows the access to the registry. The server can
    // restrict access if it is not
    // ready to serve traffic depending on various reasons.
    if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
        return Response.status(Status.FORBIDDEN).build();
    }
    CurrentRequestVersion.set(Version.toEnum(version));
    KeyType keyType = Key.KeyType.JSON;
    String returnMediaType = MediaType.APPLICATION_JSON;
    if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
        keyType = Key.KeyType.XML;
        returnMediaType = MediaType.APPLICATION_XML;
    }
    //獲取服務實體對應的快取key
    Key cacheKey = new Key(Key.EntityType.Application,
            ResponseCacheImpl.ALL_APPS,
            keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
    );

    Response response;
    //是否壓縮
    if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
        response = Response.ok(responseCache.getGZIP(cacheKey))
                .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
                .header(HEADER_CONTENT_TYPE, returnMediaType)
                .build();
    } else {
        //從快取里獲取服務實體注冊資訊,從ResponseCacheImpl類中獲取
        response = Response.ok(responseCache.get(cacheKey))
                .build();
    }
    return response;
}

//分析responseCache.get方法
//ResponseCacheImpl類#get方法
public String get(final Key key) {
    return get(key, shouldUseReadOnlyResponseCache);
}

@VisibleForTesting
String get(final Key key, boolean useReadOnlyCache) {
    Value payload = getValue(key, useReadOnlyCache);
    if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) {
        return null;
    } else {
        return payload.getPayload();
    }
}

//精髓設計的點,利用了讀寫分離,有種CopyOnWrite的思維
//private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>();
//private final LoadingCache<Key, Value> readWriteCacheMap;
@VisibleForTesting
Value getValue(final Key key, boolean useReadOnlyCache) {
    Value payload = null;
    try {
        //只讀快取的開啟
        if (useReadOnlyCache) {
            final Value currentPayload = readOnlyCacheMap.get(key);
            //只讀快取拿不到才去讀寫快取里面拿
            if (currentPayload != null) {
                payload = currentPayload;
            } else {
                payload = readWriteCacheMap.get(key);
                readOnlyCacheMap.put(key, payload);
            }
        } else {
            payload = readWriteCacheMap.get(key);
        }
    } catch (Throwable t) {...}
    return payload;
}

//ResponseCacheImpl類#構造方法
ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) {
    this.serverConfig = serverConfig;
    this.serverCodecs = serverCodecs;
    this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache();
    this.registry = registry;

    long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs();
    this.readWriteCacheMap =
            CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache())
                    //讀寫快取默認180秒會自動定時過期
                    .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS)
                    .removalListener(new RemovalListener<Key, Value>() {
                        @Override
                        public void onRemoval(RemovalNotification<Key, Value> notification) {
                            Key removedKey = notification.getKey();
                            if (removedKey.hasRegions()) {
                                Key cloneWithNoRegions = removedKey.cloneWithoutRegions();
                                regionSpecificKeys.remove(cloneWithNoRegions, removedKey);
                            }
                        }
                    })
                    .build(new CacheLoader<Key, Value>() {
                        @Override
                        public Value load(Key key) throws Exception {
                            if (key.hasRegions()) {
                                Key cloneWithNoRegions = key.cloneWithoutRegions();
                                regionSpecificKeys.put(cloneWithNoRegions, key);
                            }
                            //從記憶體注冊表中獲取
                            Value value =https://www.cnblogs.com/chafry/archive/2022/10/22/ generatePayload(key);
                            return value;
                        }
                    });

    if (shouldUseReadOnlyResponseCache) {
        //默認30秒用讀寫快取的資料更新只讀快取的資料
        timer.schedule(getCacheUpdateTask(),
                new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs)
                        + responseCacheUpdateIntervalMs),
                responseCacheUpdateIntervalMs);
    }

    try {
        Monitors.registerObject(this);
    } catch (Throwable e) {...}
}

 

 

Eureka服務端原始碼分析圖


Eureka服務端Jersey介面分析圖

    

Eureka客戶端原始碼分析圖

 

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/518948.html

標籤:其他

上一篇:JWT基礎概念詳解

下一篇:PHP記憶體木馬病毒實作原理剖析

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more