Eureka核心功能點
【1】服務注冊(register):Eureka Client會通過發送REST請求的方式向Eureka Server注冊自己的服務,提供自身的元資料,比如ip地址、埠、運行狀況指標的url、主頁地址等資訊,Eureka Server接收到注冊請求后,就會把這些元資料資訊存盤在一個雙層的Map中,
【2】服務續約(renew):在服務注冊后,Eureka Client會維護一個心跳來持續通知Eureka Server,說明服務一直處于可用狀態,防止被剔除,Eureka Client在默認的情況下會每隔30秒(eureka.instance.leaseRenewalIntervalInSeconds)發送一次心跳來進行服務續約,
【3】服務同步(replicate):Eureka Server之間會互相進行注冊,構建Eureka Server集群,不同Eureka Server之間會進行服務同步,用來保證服務資訊的一致性,
【4】獲取服務(get registry):服務消費者(Eureka Client)在啟動的時候,會發送一個REST請求給Eureka Server,獲取上面注冊的服務清單,并且快取在Eureka Client本地,默認快取30秒(eureka.client.registryFetchIntervalSeconds),同時,為了性能考慮,EurekaServer也會維護一份只讀的服務清單快取,該快取每隔30秒更新一次,
【5】服務呼叫:服務消費者在獲取到服務清單后,就可以根據清單中的服務串列資訊,查找到其他服務的地址,從而進行遠程呼叫,Eureka有Region和Zone的概念,一個Region可以包含多個Zone,在進行服務呼叫時,優先訪問處于同一個Zone中的服務提供者,
【6】服務下線(cancel):當Eureka Client需要關倍訓重啟時,就不希望在這個時間段內再有請求進來,所以,就需要提前先發送REST請求給Eureka Server,告訴Eureka Server自己要下線了,Eureka Server在收到請求后,就會把該服務狀態置為下線(DOWN),并把該下線事件傳播出去,
【7】服務剔除(evict):有時候,服務實體可能會因為網路故障等原因導致不能提供服務,而此時該實體也沒有發送請求給Eureka Server來進行服務下線,所以,還需要有服務剔除的機制,Eureka Server在啟動的時候會創建一個定時任務,每隔一段時間(默認60秒),從當前服務清單中把超時沒有續約(默認90秒,eureka.instance.leaseExpirationDurationInSeconds)的服務剔除,180s被剔除
【8】自我保護:既然Eureka Server會定時剔除超時沒有續約的服務,那就有可能出現一種場景,網路一段時間內發生了例外,所有的服務都沒能夠進行續約,Eureka Server就把所有的服務都剔除了,這樣顯然不太合理,所以,就有了自我保護機制,當短時間內,統計續約失敗的比例,如果達到一定閾值,則會觸發自我保護的機制,在該機制下,Eureka Server不會剔除任何的微服務,等到正常后,再退出自我保護機制,自我保護開關(eureka.server.enable-self-preservation: false)
常見的問題
【1】當eureka服務實體有注冊或下線或有實體發生故障,記憶體注冊表雖然會及時更新資料,但是客戶端不一定能及時感知到,可能會過30秒才能感知到,因為客戶端拉取注冊表實體這里面有一個多級快取機制,【實作的是最終一致性】
【2】還有服務剔除的不是默認90秒沒心跳的實體,剔除的是180秒沒心跳的實體(eureka的bug導致,注解有說明是因為加了兩次過期時間,但是很小的BUG所有不修復了【在Lease結構里說明】)
【3】分析eureka服務下線的情況
1)圖示

2)說明
1.客戶端每個30s會發送心跳到服務端 2.ReadOnlyCacheMap和ReadWriteCacheMap每30s同步一次 3.客戶端每隔30s同步一次ReadOnlyCacheMap 4.ribbon快取每隔30s同步一次【有負載均衡的情況】 所以正常下線需要120s 而非正常下線,外加上服務剔除的180s+60s的定時任務,也就是360s【6min】 如果出現時間太長容易出現問題 1.修改 ribbon 同步快取的時間為 3 秒:ribbon.ServerListRefreshInterval = 3000 2.修改客戶端同步快取時間為 3 秒 :eureka.client.registry-fetch-interval-seconds = 3 3.心跳間隔時間修改為 3 秒:eureka.instance.lease-renewal-interval-in-seconds = 3 4.超時剔除的時間改為 9 秒:eureka.instance.lease-expiration-duration-in-seconds = 9 5.清理執行緒定時時間改為 5 秒執行一次:eureka.server.eviction-interval-timer-in-ms = 5000 6.同步到只讀快取的時間修改為 3 秒一次:eureka.server.response-cache-update-interval-ms = 3000 只讀快取其實是可以關閉的,通過修改引數eureka.server.use-read-only-response-cache = false可以做到 正常下線就是 3+3+3+3=12 秒,非正常下線再加 18+5 秒為 35 秒, 因為本質上服務剔除的是超時過期的,而lease可知過期時間實際上是兩倍,也就是18s,考慮極端情況,18s剛好卡在定時任務的最后一刻,則是直接加上5s, 此外,這里的極端情況,也就是從某一次心跳之后開始不正常的,
原始碼精髓總結
【1】服務端多級快取設計思想:
1)在拉取注冊表的時候:
(1)首先從ReadOnlyCacheMap里查快取的注冊表,
(2)若沒有,就找ReadWriteCacheMap里快取的注冊表,
(3)如果還沒有,就從記憶體中獲取實際的注冊表資料,
2)在注冊表發生變更的時候:
(1)會在記憶體中更新變更的注冊表資料,同時過期掉ReadWriteCacheMap,
(2)此過程不會影響ReadOnlyCacheMap提供人家查詢注冊表,
(3)默認每30秒Eureka Server會將ReadWriteCacheMap更新到ReadOnlyCacheMap里
(4)默認每180秒Eureka Server會將ReadWriteCacheMap里是資料失效
(5)下次有服務拉取注冊表,又會從記憶體中獲取最新的資料了,同時填充 各級快取
3)多級快取機制的優點:
(1)盡可能保證了記憶體注冊表資料不會出現頻繁的讀寫沖突問題,
(2)并且進一步保證對Eureka Server的大量請求,都是快速從純記憶體走,性能極高(可以稍微估計下對于一線互聯網公司,內部上千個eureka client實體,每分鐘對eureka大幾千次的訪問,一天就是上千萬次的訪問)
【2】TimedSupervisorTask定時任務的設計:
1)從整體上看,TimedSupervisorTask是固定間隔的周期性任務,一旦遇到超時就會將下一個周期的間隔時間調大,如果連續超時,那么每次間隔時間都會增大一倍,一直到達外部引數設定的上限為止,一旦新任務不再超時,間隔時間又會自動恢復為初始值,另外還有CAS來控制多執行緒同步,
【3】增量更新中哈希碼檢驗的設計:
//里面的一致性哈希碼,本質上就是校驗資料 //如:服務器上全量塊存的是【ABCDEFG】,此時它的哈希碼便是全量塊存的資料的哈希值,增量塊存的是【FG】, //而我們客戶端是【ABCD】,增量拉下來再合并,則為【ABCDFG】,得到的哈希值便會與全量哈希值不一致,代表了缺失一部分資料 //故檢驗不對就會全量拉取
【4】注冊表的結構說明(這個僅是記錄):
實體資訊存放的map,這是個兩層的ConcurrentHashMap<String, Map<String,Lease<InstanceInfo>>>,外層map的key是appName,也就是服務名,內層map的key是instanceId,也就是實體名 注冊表map資料示例如下: { MICROSERVICE - PROVIDER - USER = { DESKTOP - 1 SLJLB7: microservice - provider - user: 8002 = com.netflix.eureka.lease.Lease @2cd36af6, DESKTOP - 1 SLJLB7: microservice - provider - user: 8001 = com.netflix.eureka.lease.Lease @600b7073 }, MICROSERVICE - PROVIDER - ORDER = { DESKTOP - 1 SLJLB7: microservice - provider - order: 8002 = com.netflix.eureka.lease.Lease @2cd36af6, DESKTOP - 1 SLJLB7: microservice - provider - order: 8001 = com.netflix.eureka.lease.Lease @600b7073 } }
Eureka服務端原始碼分析
【1】分析注解@EnableEurekaServer是如何開啟eurekaServer服務注冊功能
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer {} //注釋有說:這個注解是為了激活Eureka相關的配置類EurekaServerAutoConfiguration類 //但是卻是匯入了EurekaServerMarkerConfiguration類
【2】分析匯入的EurekaServerMarkerConfiguration類
//注釋說明:采用Marker的bean去激活EurekaServerAutoConfiguration類 //但實際上并沒有做什么,直接去EurekaServerAutoConfiguration類看他是怎么處理的 @Configuration(proxyBeanMethods = false) public class EurekaServerMarkerConfiguration { @Bean public Marker eurekaServerMarkerBean() { return new Marker(); } class Marker {} }
【3】分析EurekaServerAutoConfiguration類
@Configuration(proxyBeanMethods = false) @Import(EurekaServerInitializerConfiguration.class) //當發現了這里,便明白了,這個配置類要生效是必須要有Marker類的存在 //而且EurekaServerAutoConfiguration類本身是基于SpringBoot的SPI機制,自動匯入的 @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class,InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration implements WebMvcConfigurer {...}
【4】分析EurekaServerAutoConfiguration類中的方法
//初始化集群節點集合 @Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, replicationClientAdditionalFilters); } //初始化EurekaServer的相關配置 @Configuration(proxyBeanMethods = false) protected static class EurekaServerConfigBeanConfiguration { @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); if (clientConfig.shouldRegisterWithEureka()) { // Set a sensible default if we are supposed to replicate server.setRegistrySyncRetries(5); } return server; } } //初始化一些介面,用于獲取EurekaServer的資訊 @Bean @ConditionalOnProperty(prefix = "eureka.dashboard", name = "enabled", matchIfMissing = true) public EurekaController eurekaController() { return new EurekaController(this.applicationInfoManager); } //基于EurekaServer的配置,注冊表,集群節點集合,以及服務實體初始化EurekaServer背景關系 @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } //初始化經過包裝的Eureka原生啟動類 @Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); } //初始化集群注冊表 @Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry(ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); }
【5】分析EurekaServerAutoConfiguration類匯入的EurekaServerInitializerConfiguration類
//因為實作了SmartLifecycle介面,會在初始化完成后根據isAutoStartup()的回傳值確認是否呼叫start()方法 //故查看EurekaServerInitializerConfiguration類#start()方法 @Override public void start() { new Thread(() -> { try { //初始化EurekaServer,同時啟動Eureka Server eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); //發送Eureka注冊事件 publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); // 設定啟動的狀態為true EurekaServerInitializerConfiguration.this.running = true; // 發送Eureka Start事件,其他還有各種事件,我們可以監聽這種時間,然后做一些特定的業務需求 publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) {...} }).start(); } //初始化EurekaServer的運行環境和背景關系 //EurekaServerBootstrap類#contextInitialized方法 public void contextInitialized(ServletContext context) { try { //初始化運行環境 initEurekaEnvironment(); //初始化背景關系 initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { throw new RuntimeException(...); } }
【6】分析初始化背景關系initEurekaServerContext方法做了什么【進行了服務同步,服務剔除的啟動】
protected void initEurekaServerContext() throws Exception { // For backward compatibility JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), XStream.PRIORITY_VERY_HIGH); if (isAws(this.applicationInfoManager.getInfo())) { this.awsBinder = new AwsBinderDelegate(this.eurekaServerConfig, this.eurekaClientConfig, this.registry, this.applicationInfoManager); this.awsBinder.start(); } //初始化eureka server背景關系 EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // Copy registry from neighboring eureka node // 從相鄰的eureka節點復制注冊表 int registryCount = this.registry.syncUp(); // 默認每30秒發送心跳,1分鐘就是2次 // 修改eureka狀態為up // 同時,這里面會開啟一個定時任務,用于清理60秒沒有心跳的客戶端,自動下線 // 根據屬性值可知是PeerAwareInstanceRegistry類 this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); } //回傳了一個EurekaServerContextHolder【其實就是將serverContext設定進入到里面當做屬性值】 public static synchronized void initialize(EurekaServerContext serverContext) { holder = new EurekaServerContextHolder(serverContext); }
【7】服務同步的邏輯
//進行服務同步 @Override public int syncUp() { // Copy entire entry from neighboring DS node int count = 0; //從組態檔中拿到注冊的節點 for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { break; } } //呼叫節點的http請求獲取所有的服務實體 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { if (isRegisterable(instance)) { //將其他節點的實體注冊到本節點 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) {...} } } } return count; }
【8】服務剔除的邏輯
//進行服務剔除 @Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. // 計算每分鐘最大續約數 this.expectedNumberOfClientsSendingRenews = count; // 每分鐘最小續約數 updateRenewsPerMinThreshold(); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); // 設定實體的狀態為UP applicationInfoManager.setInstanceStatus(InstanceStatus.UP); // 開啟定時任務,默認60秒執行一次,用于清理60秒之內沒有續約的實體 super.postInit(); } protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); } protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); //服務剔除任務 //evictionIntervalTimerInMs = 60 * 1000,即每60s執行一次,且延遲60s evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); } //EvictionTask類#run方法 @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) {...} } //剔除邏輯 public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); } } }
Eureka客戶端原始碼分析
【1】根據SpringBoot自動裝配先找出所有會呼叫的類
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaClientConfigServerAutoConfiguration,\ org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceAutoConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration,\ org.springframework.cloud.netflix.ribbon.eureka.RibbonEurekaAutoConfiguration,\ org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.reactive.EurekaReactiveDiscoveryClientConfiguration,\ org.springframework.cloud.netflix.eureka.loadbalancer.LoadBalancerEurekaAutoConfiguration org.springframework.cloud.bootstrap.BootstrapConfiguration=\ org.springframework.cloud.netflix.eureka.config.EurekaDiscoveryClientConfigServiceBootstrapConfiguration
【2】找到對應的自動裝配類EurekaClientAutoConfiguration類
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @Import(DiscoveryClientOptionalArgsConfiguration.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class }) @AutoConfigureAfter(name = { "org.springframework.cloud.autoconfigure.RefreshAutoConfiguration", "org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration", "org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration" }) public class EurekaClientAutoConfiguration { //初始化EurekaClient的相關配置 @Bean @ConditionalOnMissingBean(value = EurekaClientConfig.class, search = SearchStrategy.CURRENT) public EurekaClientConfigBean eurekaClientConfigBean(ConfigurableEnvironment env) { EurekaClientConfigBean client = new EurekaClientConfigBean(); if ("bootstrap".equals(this.env.getProperty("spring.config.name"))) { // We don't register during bootstrap by default, but there will be another // chance later. client.setRegisterWithEureka(false); } return client; } //Client啟動時的自動注冊Bean @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaAutoServiceRegistration eurekaAutoServiceRegistration( ApplicationContext context, EurekaServiceRegistry registry, EurekaRegistration registration) { return new EurekaAutoServiceRegistration(context, registry, registration); } //EurekaClient配置類 @Configuration(proxyBeanMethods = false) @ConditionalOnMissingRefreshScope protected static class EurekaClientConfiguration { @Autowired private ApplicationContext context; @Autowired private AbstractDiscoveryClientOptionalArgs<?> optionalArgs; @Bean(destroyMethod = "shutdown") @ConditionalOnMissingBean(value = EurekaClient.class,search = SearchStrategy.CURRENT) public EurekaClient eurekaClient(ApplicationInfoManager manager, EurekaClientConfig config) { return new CloudEurekaClient(manager, config, this.optionalArgs, this.context); } @Bean @ConditionalOnMissingBean(value = ApplicationInfoManager.class, search = SearchStrategy.CURRENT) public ApplicationInfoManager eurekaApplicationInfoManager(EurekaInstanceConfig config) { InstanceInfo instanceInfo = new InstanceInfoFactory().create(config); return new ApplicationInfoManager(config, instanceInfo); } @Bean @ConditionalOnBean(AutoServiceRegistrationProperties.class) @ConditionalOnProperty(value = "spring.cloud.service-registry.auto-registration.enabled", matchIfMissing = true) public EurekaRegistration eurekaRegistration(EurekaClient eurekaClient, CloudEurekaInstanceConfig instanceConfig, ApplicationInfoManager applicationInfoManager, @Autowired( required = false) ObjectProvider<HealthCheckHandler> healthCheckHandler) { return EurekaRegistration.builder(instanceConfig).with(applicationInfoManager) .with(eurekaClient).with(healthCheckHandler).build(); } } .... }
【2.1】分析注解@AutoConfigureAfter匯入的EurekaDiscoveryClientConfiguration類做了什么
@Configuration(proxyBeanMethods = false) @EnableConfigurationProperties @ConditionalOnClass(EurekaClientConfig.class) @ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @ConditionalOnBlockingDiscoveryEnabled public class EurekaDiscoveryClientConfiguration { //基于EurekaClientAutoConfiguration的啟動標志 @Deprecated @Bean public Marker eurekaDiscoverClientMarker() { return new Marker(); } //將EurekaClient包裝成EurekaDiscoveryClient @Bean @ConditionalOnMissingBean public EurekaDiscoveryClient discoveryClient(EurekaClient client, EurekaClientConfig clientConfig) { return new EurekaDiscoveryClient(client, clientConfig); } //心跳檢測的處理配置 @Configuration(proxyBeanMethods = false) @ConditionalOnProperty(value = "eureka.client.healthcheck.enabled",matchIfMissing = false) protected static class EurekaHealthCheckHandlerConfiguration { @Autowired(required = false) private StatusAggregator statusAggregator = new SimpleStatusAggregator(); @Bean @ConditionalOnMissingBean(HealthCheckHandler.class) public EurekaHealthCheckHandler eurekaHealthCheckHandler() { return new EurekaHealthCheckHandler(this.statusAggregator); } } @Deprecated class Marker { } //定義了Client配置重刷的監聽器 @Configuration(proxyBeanMethods = false) @ConditionalOnClass(RefreshScopeRefreshedEvent.class) protected static class EurekaClientConfigurationRefresher implements ApplicationListener<RefreshScopeRefreshedEvent> { .... } } //看得出來包裝也只是將配置和客戶端放在了一起 public EurekaDiscoveryClient(EurekaClient eurekaClient, EurekaClientConfig clientConfig) { this.clientConfig = clientConfig; this.eurekaClient = eurekaClient; }
【3】分析EurekaClient的相關配置EurekaClientConfigBean類
//僅列舉了部分 @ConfigurationProperties(EurekaClientConfigBean.PREFIX) public class EurekaClientConfigBean implements EurekaClientConfig, Ordered { //客戶端配置前綴 public static final String PREFIX = "eureka.client"; //public static final String DEFAULT_PREFIX = "/eureka"; //默認的注冊地址 public static final String DEFAULT_URL = "http://localhost:8761" + DEFAULT_PREFIX + "/"; //默認域 public static final String DEFAULT_ZONE = "defaultZone"; private static final int MINUTES = 60; //多長時間從注冊中心服務端拉取一次服務資訊,單位秒;這個就是主動拉取注冊中心上所有服務的實體資訊 private int registryFetchIntervalSeconds = 30; //多長時間復制實體變化到eureka服務端,單位秒;這個配置是復制實體資訊到注冊中心 private int instanceInfoReplicationIntervalSeconds = 30; //實體初始化復制資訊到eureka服務端的間隔時間,所以可以看到,其實實體的初始化階段不是立即復制實體資訊到注冊中心的,單位秒 private int initialInstanceInfoReplicationIntervalSeconds = 40; //eureka服務端的變化,多長時間,客戶端會獲取一次eureka服務的資訊 private int eurekaServiceUrlPollIntervalSeconds = 5 * MINUTES; //eureka server的代理埠 private String proxyPort; //eureka server的代理host name private String proxyHost; //賬號 private String proxyUserName; //密碼 private String proxyPassword; //從server讀取所需的超時時間 private int eurekaServerReadTimeoutSeconds = 8; //連接server的超時時間 private int eurekaServerConnectTimeoutSeconds = 5; //被允許連接到所有server host的總連接數 private int eurekaServerTotalConnections = 200; // 被允許連接到每一個server host的總連接數 private int eurekaServerTotalConnectionsPerHost = 50; //連接到server的http連接的空閑超時時間,超時會被清理掉 private int eurekaConnectionIdleTimeoutSeconds = 30; //heartbeatExecutor 心跳的執行緒數 private int heartbeatExecutorThreadPoolSize = 2; //客戶端初始化階段強制注冊,默認關閉 private boolean shouldEnforceRegistrationAtInit = false; ... }
【4】分析EurekaClientConfiguration配置類里面生成的EurekaClient的Bean
//CloudEurekaClient類【繼承DiscoveryClient類】#構造方法 public CloudEurekaClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs<?> args, ApplicationEventPublisher publisher) { super(applicationInfoManager, config, args); this.applicationInfoManager = applicationInfoManager; this.publisher = publisher; this.eurekaTransportField = ReflectionUtils.findField(DiscoveryClient.class, "eurekaTransport"); ReflectionUtils.makeAccessible(this.eurekaTransportField); } //DiscoveryClient類【繼承EurekaClient(原生的EurekaClient)】#構造方法 public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args) { this(applicationInfoManager, config, args, ResolverUtils::randomize); } public DiscoveryClient(ApplicationInfoManager applicationInfoManager, final EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, EndpointRandomizer randomizer) { //主要是這個this指向,畢竟里面的都是方法傳參 this(applicationInfoManager, config, args, new Provider<BackupRegistry>() { private volatile BackupRegistry backupRegistryInstance; @Override public synchronized BackupRegistry get() { if (backupRegistryInstance == null) { String backupRegistryClassName = config.getBackupRegistryImpl(); if (null != backupRegistryClassName) { try { backupRegistryInstance = (BackupRegistry) Class.forName(backupRegistryClassName).newInstance(); logger.info("Enabled backup registry of type {}", backupRegistryInstance.getClass()); } catch (InstantiationException e) {..} catch (IllegalAccessException e) {..} catch (ClassNotFoundException e) {...} } if (backupRegistryInstance == null) { logger.warn("Using default backup registry implementation which does not do anything."); backupRegistryInstance = new NotImplementedRegistryImpl(); } } return backupRegistryInstance; } }, randomizer); }
【5】分析DiscoveryClient的構造方法
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider, EndpointRandomizer endpointRandomizer) { if (args != null) { this.healthCheckHandlerProvider = args.healthCheckHandlerProvider; this.healthCheckCallbackProvider = args.healthCheckCallbackProvider; this.eventListeners.addAll(args.getEventListeners()); this.preRegistrationHandler = args.preRegistrationHandler; } else { this.healthCheckCallbackProvider = null; this.healthCheckHandlerProvider = null; this.preRegistrationHandler = null; } this.applicationInfoManager = applicationInfoManager; InstanceInfo myInfo = applicationInfoManager.getInfo(); clientConfig = config; staticClientConfig = clientConfig; transportConfig = config.getTransportConfig(); instanceInfo = myInfo; if (myInfo != null) { appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId(); } else {...} this.backupRegistryProvider = backupRegistryProvider; this.endpointRandomizer = endpointRandomizer; this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo); localRegionApps.set(new Applications()); fetchRegistryGeneration = new AtomicLong(0); remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions()); remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(",")); if (config.shouldFetchRegistry()) { this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } if (config.shouldRegisterWithEureka()) { this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L}); } else { this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC; } //從這里開始初始化Eureka Client if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); scheduler = null; heartbeatExecutor = null; cacheRefreshExecutor = null; eurekaTransport = null; instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), clientConfig.getRegion()); // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance() // to work with DI'd DiscoveryClient DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", initTimestampMs, this.getApplications().size()); return; // no need to setup up an network tasks and we are done } try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); //心跳的執行緒池 heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //快取重刷的執行緒池 cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff eurekaTransport = new EurekaTransport(); scheduleServerEndpointTask(eurekaTransport, args); AzToRegionMapper azToRegionMapper; if (clientConfig.shouldUseDnsForFetchingServiceUrls()) { azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig); } else { azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig); } if (null != remoteRegionsToFetch.get()) { azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(",")); } instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion()); } catch (Throwable e) { throw new RuntimeException("Failed to initialize DiscoveryClient!", e); } if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) { fetchRegistryFromBackup(); } // call and execute the pre registration handler before all background tasks (inc registration) is started if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!register() ) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable th) {...} } // finally, init the schedule tasks (e.g. cluster resolvers, heartbeat, instanceInfo replicator, fetch //最核心代碼,初始化定時任務 initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) {...} DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); initTimestampMs = System.currentTimeMillis(); }
【6】核心邏輯initScheduledTasks初始化定時任務,是做了什么
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { //獲取服務注冊串列資訊 if (clientConfig.shouldFetchRegistry()) { //服務注冊串列更新的周期時間 //默認是30 int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); //定時更新服務注冊串列 //這里的延時任務明顯是只呼叫一次,具體在分析他的任務的run方法 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() //該執行緒執行更新的具體邏輯 ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { //服務續約的周期時間 int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); //應用啟動可見此日志,內容是:Starting heartbeat executor: renew interval is: 30 logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // Heartbeat timer // 服務定時續約 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() //該執行緒執行續約的具體邏輯 ), renewalIntervalInSecs, TimeUnit.SECONDS); //這個Runable中含有服務注冊的邏輯 instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //服務注冊 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
【6.1】定時任務TimedSupervisorTask類的設計
//TimedSupervisorTask類#run方法 //這里存在一個設計的亮點 public class TimedSupervisorTask extends TimerTask { ... public TimedSupervisorTask(String name, ScheduledExecutorService scheduler, ThreadPoolExecutor executor, int timeout, TimeUnit timeUnit, int expBackOffBound, Runnable task) { this.scheduler = scheduler; this.executor = executor; this.timeoutMillis = timeUnit.toMillis(timeout); this.task = task; //可以看出任務還是需要根據傳入來的 this.delay = new AtomicLong(timeoutMillis); this.maxDelay = timeoutMillis * expBackOffBound; // Initialize the counters and register. successCounter = Monitors.newCounter("success"); timeoutCounter = Monitors.newCounter("timeouts"); rejectedCounter = Monitors.newCounter("rejectedExecutions"); throwableCounter = Monitors.newCounter("throwables"); threadPoolLevelGauge = new LongGauge(MonitorConfig.builder("threadPoolUsed").build()); Monitors.registerObject(name, this); } @Override public void run() { Future<?> future = null; try { future = executor.submit(task); threadPoolLevelGauge.set((long) executor.getActiveCount()); //設定了超時時間 future.get(timeoutMillis, TimeUnit.MILLISECONDS); // block until done or timeout //出現任務不超時的情況又會將延遲時間重置(這里主要是配合下面捕捉例外的超時翻倍情況) delay.set(timeoutMillis); threadPoolLevelGauge.set((long) executor.getActiveCount()); successCounter.increment(); } catch (TimeoutException e) { logger.warn("task supervisor timed out", e); //出現超時的記錄 timeoutCounter.increment(); //將超時時間翻倍(在最大的任務時間內),主動延遲 long currentDelay = delay.get(); long newDelay = Math.min(maxDelay, currentDelay * 2); //設定為最新的值,考慮到多執行緒,所以用了CAS delay.compareAndSet(currentDelay, newDelay); } catch (RejectedExecutionException e) { //一旦執行緒池的阻塞佇列中放滿了待處理任務,觸發了拒絕策略,就會將調度器停掉 if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, reject the task", e); } else { logger.warn("task supervisor rejected the task", e); } //被拒絕的次數 rejectedCounter.increment(); } catch (Throwable e) { if (executor.isShutdown() || scheduler.isShutdown()) { logger.warn("task supervisor shutting down, can't accept the task"); } else { logger.warn("task supervisor threw an exception", e); } throwableCounter.increment(); } finally { if (future != null) { //這里任務要么執行完畢,要么發生例外,都用cancel方法來清理任務; future.cancel(true); } //只要調度器沒有停止,就再指定等待時間之后在執行一次同樣的任務 //任務里面又塞入這個任務 if (!scheduler.isShutdown()) { //假設外部呼叫時傳入的超時時間為30秒(構造方法的入參timeout),最大間隔時間為50秒(構造方法的入參expBackOffBound) //如果最近一次任務沒有超時,那么就在30秒后開始新任務, //如果最近一次任務超時了,那么就在50秒后開始新任務(例外處理中有個乘以二的操作,乘以二后的60秒超過了最大間隔50秒) scheduler.schedule(this, delay.get(), TimeUnit.MILLISECONDS); } } } }
【6.2】分析更新服務注冊串列任務 CacheRefreshThread【獲取服務邏輯】
//DiscoveryClient類的內置類 class CacheRefreshThread implements Runnable { public void run() { refreshRegistry(); } } //DiscoveryClient類#refreshRegistry方法 @VisibleForTesting void refreshRegistry() { try { boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries(); boolean remoteRegionsModified = false; // This makes sure that a dynamic change to remote regions to fetch is honored. String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions(); //不做aws環境的配置這個if邏輯不會執行 if (null != latestRemoteRegions) { String currentRemoteRegions = remoteRegionsToFetch.get(); if (!latestRemoteRegions.equals(currentRemoteRegions)) { // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync synchronized (instanceRegionChecker.getAzToRegionMapper()) { if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) { String[] remoteRegions = latestRemoteRegions.split(","); remoteRegionsRef.set(remoteRegions); instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions); remoteRegionsModified = true; } else {....} } } else { // Just refresh mapping to reflect any DNS/Property change instanceRegionChecker.getAzToRegionMapper().refreshMapping(); } } //獲取注冊資訊方法 boolean success = fetchRegistry(remoteRegionsModified); if (success) { registrySize = localRegionApps.get().size(); lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis(); } if (logger.isDebugEnabled()) {...省略日志內容...} } catch (Throwable e) {...} } private boolean fetchRegistry(boolean forceFullRegistryFetch) { Stopwatch tracer = FETCH_REGISTRY_TIMER.start(); try { // 如果增量被禁用,或者是第一次,那么獲取所有應用程式 // 取出本地快取之前獲取的服務串列資訊 Applications applications = getApplications(); //是否禁用增量更新 if (clientConfig.shouldDisableDelta() || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress())) || forceFullRegistryFetch //是否第一次拉取 || (applications == null) || (applications.getRegisteredApplications().size() == 0) || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta { //全量獲取 getAndStoreFullRegistry(); } else { //增量獲取 getAndUpdateDelta(applications); } //更新本地快取 applications.setAppsHashCode(applications.getReconcileHashCode()); logTotalInstances(); } catch (Throwable e) { return false; } finally { if (tracer != null) { tracer.stop(); } } // Notify about cache refresh before updating the instance remote status //將本地快取更新的事件廣播給所有已注冊的監聽器,注意該方法已被CloudEurekaClient類重寫 onCacheRefreshed(); // Update remote status based on refreshed data held in the cache //檢查剛剛更新的快取中,有來自Eureka server的服務串列,其中包含了當前應用的狀態, //當前實體的成員變數lastRemoteInstanceStatus,記錄的是最后一次更新的當前應用狀態, //上述兩種狀態在updateInstanceRemoteStatus方法中作比較 ,如果不一致,就更新lastRemoteInstanceStatus,并且廣播對應的事件 updateInstanceRemoteStatus(); // registry was fetched successfully, so return true return true; } @Override public Applications getApplications() { return localRegionApps.get(); }
【6.2.1】分析全量更新
private void getAndStoreFullRegistry() throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications apps = null; //由于并沒有配置特別關注的region資訊,因此會呼叫eurekaTransport.queryClient.getApplications方法從服務端獲取服務串列 EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null ? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get()) : eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //回傳物件就是服務串列 apps = httpResponse.getEntity(); } logger.info("The response status is {}", httpResponse.getStatusCode()); if (apps == null) {...} else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { //考慮到多執行緒同步,只有CAS成功的執行緒,才會把自己從Eureka server獲取的資料來替換本地快取 localRegionApps.set(this.filterAndShuffle(apps)); } else {...} } //EurekaHttpClientDecorator類#getApplications方法 @Override public EurekaHttpResponse<Applications> getApplications(final String... regions) { //這里面涉及到配置是否重試 return execute(new RequestExecutor<Applications>() { @Override public EurekaHttpResponse<Applications> execute(EurekaHttpClient delegate) { //呼叫AbstractJerseyEurekaHttpClient類 return delegate.getApplications(regions); } @Override public RequestType getRequestType() { return RequestType.GetApplications; } }); } @Override public EurekaHttpResponse<Applications> getApplications(String... regions) { //取增量資料的path是"apps/delta" return getApplicationsInternal("apps/", regions); } //具體的請求回應處理都在此方法中 private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) { ClientResponse response = null; String regionsParamValue = null; try { //jersey、resource這些關鍵詞都預示著這是個restful請求 WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath); if (regions != null && regions.length > 0) { regionsParamValue = StringUtil.join(regions); webResource = webResource.queryParam("regions", regionsParamValue); } Builder requestBuilder = webResource.getRequestBuilder(); addExtraHeaders(requestBuilder); //發起網路請求,將回應封裝成ClientResponse實體 response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class); Applications applications = null; if (response.getStatus() == Status.OK.getStatusCode() && response.hasEntity()) { //取得全部應用資訊 applications = response.getEntity(Applications.class); } return anEurekaHttpResponse(response.getStatus(), Applications.class) .headers(headersOf(response)) .entity(applications) .build(); } finally { if (response != null) { response.close(); } } } //總結:獲取全量資料,是通過jersey-client庫的API向Eureka server發起restful請求http://localhost:8761/eureka/apps實作的,并將回應的服務串列資料放在一個成員變數中作為本地快取
【6.2.2】分析增量更新
//分析增量更新 //里面的一致性哈希碼,本質上就是校驗資料 //如:服務器上全量塊存的是【ABCDEFG】,此時它的哈希碼便是全量塊存的資料的哈希值,增量塊存的是【FG】, //而我們客戶端是【ABCD】,增量拉下來再合并,則為【ABCDFG】,得到的哈希值便會與全量哈希值不一致,代表了缺失一部分資料 //故檢驗不對就會全量拉取 private void getAndUpdateDelta(Applications applications) throws Throwable { long currentUpdateGeneration = fetchRegistryGeneration.get(); Applications delta = null; //增量資訊是通過eurekaTransport.queryClient.getDelta方法完成的 EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get()); if (httpResponse.getStatusCode() == Status.OK.getStatusCode()) { //delta中保存了Eureka server回傳的增量更新 delta = httpResponse.getEntity(); } //如果沒有 if (delta == null) { //如果增量資訊為空,就直接發起一次全量更新 getAndStoreFullRegistry(); } //考慮到多執行緒同步問題,這里通過CAS來確保請求發起到現在是執行緒安全的, //如果這期間fetchRegistryGeneration變了,就表示其他執行緒也做了類似操作,因此放棄本次回應的資料 else if (fetchRegistryGeneration.compareAndSet(currentUpdateGeneration, currentUpdateGeneration + 1)) { logger.debug("Got delta update with apps hashcode {}", delta.getAppsHashCode()); String reconcileHashCode = ""; if (fetchRegistryUpdateLock.tryLock()) { try { //用Eureka回傳的增量資料和本地資料做合并操作 updateDelta(delta); //用合并了增量資料之后的本地資料來生成一致性哈希碼 reconcileHashCode = getReconcileHashCode(applications); } finally { fetchRegistryUpdateLock.unlock(); } } else {...} //Eureka server在回傳增量更新資料時,也會回傳服務端的一致性哈希碼, //理論上每次本地快取資料經歷了多次增量更新后,計算出的一致性哈希碼應該是和服務端一致的, //如果發現不一致,就證明本地快取的服務串列資訊和Eureka server不一致了,需要做一次全量更新 if (!reconcileHashCode.equals(delta.getAppsHashCode()) || clientConfig.shouldLogDeltaDiff()) { //一致性哈希碼不同,就在reconcileAndLogDifference方法中做全量更新 reconcileAndLogDifference(delta, reconcileHashCode); // this makes a remoteCall } } else {...} } //updateDelta方法將增量更新資料和本地資料做合并 private void updateDelta(Applications delta) { int deltaCount = 0; //遍歷所有服務 for (Application app : delta.getRegisteredApplications()) { //遍歷當前服務的所有實體 for (InstanceInfo instance : app.getInstances()) { //取出快取的所有服務串列,用于合并 Applications applications = getApplications(); String instanceRegion = instanceRegionChecker.getInstanceRegion(instance); //判斷正在處理的實體和當前應用是否在同一個region if (!instanceRegionChecker.isLocalRegion(instanceRegion)) { //如果不是同一個region,接下來合并的資料就換成專門為其他region準備的快取 Applications remoteApps = remoteRegionVsApps.get(instanceRegion); if (null == remoteApps) { remoteApps = new Applications(); remoteRegionVsApps.put(instanceRegion, remoteApps); } applications = remoteApps; } ++deltaCount; //對新增的實體的處理 if (ActionType.ADDED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Added instance {} to the existing apps in region {}", instance.getId(), instanceRegion); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } //對修改實體的處理 else if (ActionType.MODIFIED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp == null) { applications.addApplication(app); } logger.debug("Modified instance {} to the existing apps ", instance.getId()); applications.getRegisteredApplications(instance.getAppName()).addInstance(instance); } //對洗掉實體的處理 else if (ActionType.DELETED.equals(instance.getActionType())) { Application existingApp = applications.getRegisteredApplications(instance.getAppName()); if (existingApp != null) { logger.debug("Deleted instance {} to the existing apps ", instance.getId()); existingApp.removeInstance(instance); /* * We find all instance list from application(The status of instance status is not only the status is UP but also other status) * if instance list is empty, we remove the application. */ if (existingApp.getInstancesAsIsFromEureka().isEmpty()) { applications.removeApplication(existingApp); } } } } } logger.debug("The total number of instances fetched by the delta processor : {}", deltaCount); getApplications().setVersion(delta.getVersion()); //整理資料,使得后續使用程序中,這些應用的實體總是以相同順序回傳 getApplications().shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); //和當前應用不在同一個region的應用,其實體資料也要整理 for (Applications applications : remoteRegionVsApps.values()) { applications.setVersion(delta.getVersion()); applications.shuffleInstances(clientConfig.shouldFilterOnlyUpInstances()); } }
【6.3】分析服務定時續約任務 HeartbeatThread(也就是心跳機制)
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { //發送心跳請求 httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == Status.OK.getStatusCode(); } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
【7】分析服務注冊的instanceInfoReplicator.start方法
public void start(int initialDelayMs) { if (started.compareAndSet(false, true)) { instanceInfo.setIsDirty(); // for initial register Future next = scheduler.schedule(this, initialDelayMs, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } //InstanceInfoReplicator類#run方法 public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { //服務注冊 discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } } boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { //發起注冊請求 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode(); }
【8】Eureka Server服務端Jersey介面部分分析
【8.1】服務端Jersey介面處理類ApplicationResource
@Produces({"application/xml", "application/json"})
public class ApplicationResource {
...
//注冊一個實體的資訊
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
// validate that the instanceinfo contains all the necessary required fields
// 引數校驗,不符合驗證規則的,回傳400狀態碼,
if (isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if (isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if (isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if (isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if (!appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
} else if (info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if (info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
}
// handle cases where clients may be registering with bad DataCenterInfo with missing data
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if (dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
if (isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if (experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
} else if (dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
if (effectiveId == null) {
amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
// 重點在這里,進行注冊
registry.register(info, "true".equals(isReplication));
return Response.status(204).build(); // 204 to be backwards compatible
}
}
【8.1.1】注冊方法分析
@Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } //AbstractInstanceRegistry類#register方法 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { // 上只讀鎖 read.lock(); // 從本地MAP里面獲取當前實體的資訊 //注冊表的結構 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); // 增加注冊次數到監控資訊里面去, REGISTER.increment(isReplication); if (gMap == null) { // 如果第一次進來,那么gMap為空,則創建一個ConcurrentHashMap放入到registry里面去 final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); // putIfAbsent方法主要是在向ConcurrentHashMap中添加鍵—值對的時候,它會先判斷該鍵值對是否已經存在, // 如果不存在(新的entry),那么會向map中添加該鍵值對,并回傳null, // 如果已經存在,那么不會覆寫已有的值,直接回傳已經存在的值, gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { // 表明map中確實不存在,則設定gMap為最新創建的那個 gMap = gNewMap; } } // 從MAP中查詢已經存在的Lease資訊 (比如第二次來) Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // 當Lease的物件不為空時, if (existingLease != null && (existingLease.getHolder() != null)) { // 當instance已經存在是,和客戶端的instance的資訊做比較,時間最新的那個,為有效instance資訊 Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { registrant = existingLease.getHolder(); } } else { // 這里只有當existinglease不存在時,才會進來, 像那種恢復心跳,資訊過期的,都不會進入這里, // Eureka‐Server的自我保護機制做的操作,為每分鐘最大續約數+2 ,同時重新計算每分鐘最小續約數 synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } } // 構建一個最新的Lease資訊 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { // 當原來存在Lease的資訊時,設定他的serviceUpTimestamp, 保證服務開啟的時間一直是第一次的那個 lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 放入本地Map中 gMap.put(registrant.getId(), lease); // 添加到最近的注冊佇列里面去,以時間戳作為Key, 名稱作為value,主要是為了運維界面的統計資料, synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // 分析instanceStatus if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // 得到instanceStatus,判斷是否是UP狀態, if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } // 設定注冊型別為添加 registrant.setActionType(ActionType.ADDED); // 租約變更記錄佇列,記錄了實體的每次變化, 用于注冊資訊的增量獲取、 recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); // 清理快取 ,傳入的引數為key invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); } finally { read.unlock(); } }
【8.1.1】分析Lease結構
public class Lease<T> { enum Action { Register, Cancel, Renew }; //租約過期的時間常量,默認90秒,也就說90秒沒有心跳過來,那么這邊將會自動剔除該節點 public static final int DEFAULT_DURATION_IN_SECS = 90; 這個租約是屬于誰的, 目前占用這個屬性的是 private T holder; //租約是啥時候過期的,當服務下線的時候,會過來更新這個時間戳registrationTimestamp : 租約的注冊時間 private long evictionTimestamp; private long registrationTimestamp; //服務啟動時間 ,當客戶端在注冊的時候,instanceInfo的status 為UP的時候,則更新這個時間戳 private long serviceUpTimestamp; //最后更新時間,每次續約的時候,都會更新這個時間戳,在判斷實體是否過期時,需要用到這個屬性, private volatile long lastUpdateTimestamp; //過期時間,毫秒單位 private long duration; public Lease(T r, int durationInSecs) { holder = r; registrationTimestamp = System.currentTimeMillis(); lastUpdateTimestamp = registrationTimestamp; duration = (durationInSecs * 1000); } //更新的時候設定過期時間為當前時間+90S public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } public void cancel() { if (evictionTimestamp <= 0) { evictionTimestamp = System.currentTimeMillis(); } } public void serviceUp() { if (serviceUpTimestamp == 0) { serviceUpTimestamp = System.currentTimeMillis(); } } public void setServiceUpTimestamp(long serviceUpTimestamp) { this.serviceUpTimestamp = serviceUpTimestamp; } public boolean isExpired() { return isExpired(0l); } //這里面存在的問題是過期時間+90S //實際上也就是在更新時候的180s之后才算過期 public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } public long getRegistrationTimestamp() { return registrationTimestamp; } public long getLastRenewalTimestamp() { return lastUpdateTimestamp; } public long getEvictionTimestamp() { return evictionTimestamp; } public long getServiceUpTimestamp() { return serviceUpTimestamp; } public T getHolder() { return holder; } }
【8.2】客戶端Jersey介面處理類ApplicationsResource
@Path("/{version}/apps")
@Produces({"application/xml", "application/json"})
public class ApplicationsResource {
...
private final EurekaServerConfig serverConfig;
private final PeerAwareInstanceRegistry registry;
private final ResponseCache responseCache;
@Inject
ApplicationsResource(EurekaServerContext eurekaServer) {
this.serverConfig = eurekaServer.getServerConfig();
this.registry = eurekaServer.getRegistry();
this.responseCache = registry.getResponseCache();
}
public ApplicationsResource() {
this(EurekaServerContextHolder.getInstance().getServerContext());
}
//獲取關于特定{@link com.netflix.discovery.shared.Application}的資訊,
@Path("{appId}")
public ApplicationResource getApplicationResource(
@PathParam("version") String version,
@PathParam("appId") String appId) {
CurrentRequestVersion.set(Version.toEnum(version));
return new ApplicationResource(appId, serverConfig, registry);
}
//獲取關于所有{@link com.netflix.discovery.shared.Applications}的資訊,
@GET
public Response getContainers(@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo,
@Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment();
}
// Check if the server allows the access to the registry. The server can
// restrict access if it is not
// ready to serve traffic depending on various reasons.
if (!registry.shouldAllowAccess(isRemoteRegionRequested)) {
return Response.status(Status.FORBIDDEN).build();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
//獲取服務實體對應的快取key
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
Response response;
//是否壓縮
if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) {
response = Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
//從快取里獲取服務實體注冊資訊
response = Response.ok(responseCache.get(cacheKey))
.build();
}
return response;
}
//在{@link com.netflix.discovery.shared.Applications}中獲取關于所有增量更改的資訊,
@Path("delta")
@GET
public Response getContainerDifferential(
@PathParam("version") String version,
@HeaderParam(HEADER_ACCEPT) String acceptHeader,
@HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding,
@HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept,
@Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) {
boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty();
// If the delta flag is disabled in discovery or if the lease expiration
// has been disabled, redirect clients to get all instances
if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) {
return Response.status(Status.FORBIDDEN).build();
}
String[] regions = null;
if (!isRemoteRegionRequested) {
EurekaMonitors.GET_ALL_DELTA.increment();
} else {
regions = regionsStr.toLowerCase().split(",");
Arrays.sort(regions); // So we don't have different caches for same regions queried in different order.
EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment();
}
CurrentRequestVersion.set(Version.toEnum(version));
KeyType keyType = Key.KeyType.JSON;
String returnMediaType = MediaType.APPLICATION_JSON;
if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) {
keyType = Key.KeyType.XML;
returnMediaType = MediaType.APPLICATION_XML;
}
Key cacheKey = new Key(Key.EntityType.Application,
ResponseCacheImpl.ALL_APPS_DELTA,
keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions
);
if (acceptEncoding != null
&& acceptEncoding.contains(HEADER_GZIP_VALUE)) {
return Response.ok(responseCache.getGZIP(cacheKey))
.header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE)
.header(HEADER_CONTENT_TYPE, returnMediaType)
.build();
} else {
return Response.ok(responseCache.get(cacheKey))
.build();
}
}
}
【8.2.1】ApplicationsResource類的getContainers方法分析
//獲取關于所有{@link com.netflix.discovery.shared.Applications}的資訊, @GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } //獲取服務實體對應的快取key Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; //是否壓縮 if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { //從快取里獲取服務實體注冊資訊,從ResponseCacheImpl類中獲取 response = Response.ok(responseCache.get(cacheKey)) .build(); } return response; } //分析responseCache.get方法 //ResponseCacheImpl類#get方法 public String get(final Key key) { return get(key, shouldUseReadOnlyResponseCache); } @VisibleForTesting String get(final Key key, boolean useReadOnlyCache) { Value payload = getValue(key, useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { return null; } else { return payload.getPayload(); } } //精髓設計的點,利用了讀寫分離,有種CopyOnWrite的思維 //private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); //private final LoadingCache<Key, Value> readWriteCacheMap; @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { //只讀快取的開啟 if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); //只讀快取拿不到才去讀寫快取里面拿 if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) {...} return payload; } //ResponseCacheImpl類#構造方法 ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) //讀寫快取默認180秒會自動定時過期 .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } //從記憶體注冊表中獲取 Value value =https://www.cnblogs.com/chafry/p/ generatePayload(key); return value; } }); if (shouldUseReadOnlyResponseCache) { //默認30秒用讀寫快取的資料更新只讀快取的資料 timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) {...} }
Eureka服務端原始碼分析圖

Eureka服務端Jersey介面分析圖

Eureka客戶端原始碼分析圖

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/518927.html
標籤:Java
上一篇:JWT基礎概念詳解
下一篇:RockerMQ啟動Broke報錯 /Library/Internet: No such file or directory
