主頁 > 後端開發 > SpringCloud 原始碼系列(4)—— 負載均衡 Ribbon

SpringCloud 原始碼系列(4)—— 負載均衡 Ribbon

2020-12-17 07:00:49 後端開發

一、負載均衡

1、RestTemplate

在研究 eureka 原始碼上篇中,我們在 demo-consumer 消費者服務中定義了用 @LoadBalanced 標記的 RestTemplate,然后使用 RestTemplate 通過服務名的形式來呼叫遠程服務 demo-producer,然后請求會輪詢到兩個 demo-producer 實體上,

RestTemplate 是 Spring Resources 中一個訪問第三方 RESTful API 介面的網路請求框架,RestTemplate 是用來消費 REST 服務的,所以 RestTemplate 的主要方法都與 REST 的 Http協議的一些方法緊密相連,例如 HEAD、GET、POST、PUT、DELETE 和 OPTIONS 等方法,這些方法在 RestTemplate 類對應的方法為 headForHeaders()、getForObject()、postForObject()、put() 和 delete() 等,

RestTemplate 本身是不具備負載均衡的能力的,如果 RestTemplate 未使用 @LoadBalanced 標記,就通過服務名的形式來呼叫,必然會報錯,用 @LoadBalanced 標記后,呼叫 RestTemplate 的 REST 方法就會通過負載均衡的方式通過一定的策略路由到某個服務實體上,底層負責負載均衡的組件就是 Ribbon,后面我們再來看 @LoadBalanced 是如何讓 RestTemplate 具備負載均衡的能力的,

 1 @SpringBootApplication
 2 public class ConsumerApplication {
 3 
 4     @Bean
 5     @LoadBalanced
 6     public RestTemplate restTemplate() {
 7         return new RestTemplate();
 8     }
 9 
10     public static void main(String[] args) {
11         SpringApplication.run(ConsumerApplication.class, args);
12     }
13 }
14 
15 @RestController
16 public class DemoController {
17     private final Logger logger = LoggerFactory.getLogger(getClass());
18 
19     @Autowired
20     private RestTemplate restTemplate;
21 
22     @GetMapping("/v1/id")
23     public ResponseEntity<String> getId() {
24         ResponseEntity<String> result = restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class);
25         String uuid = result.getBody();
26         logger.info("request id: {}", uuid);
27         return ResponseEntity.ok(uuid);
28     }
29 }

2、Ribbon 與負載均衡

① 負載均衡

負載均衡是指將負載分攤到多個執行單元上,負載均衡主要可以分為集中式負載均衡與行程內負載均衡:

  • 集中式負載均衡指位于因特網與執行單元之間,并負責把網路請求轉發到各個執行單元上,比如 Nginx、F5,集中式負載均衡也可以稱為服務端負載均衡,
  • 行程內負載均衡是將負載均衡邏輯集成到客戶端上,客戶端維護了一份服務提供者的實體串列,實體串列一般會從注冊中心比如 Eureka 中獲取,有了實體串列,就可以通過負載均衡策略將請求分攤給多個服務提供者,從而達到負載均衡的目的,行程內負載均衡一般也稱為客戶端負載均衡,

Ribbon 是一個客戶端負載均衡器,可以很好地控制 HTTP 和 TCP 客戶端的負載均衡行為,Ribbon 是 Netflix 公司開源的一個負載均衡組件,已經整合到 SpringCloud 生態中,它在 Spring Cloud 生態內是一個不可缺少的組件,少了它,服務便不能橫向擴展,

② Ribbon 模塊

Ribbon 有很多子模塊,官方檔案中說明,目前 Netflix 公司主要用于生產環境的 Ribbon 子模塊如下:

  • ribbon-loadbalancer:可以獨立使用或與其他模塊一起使用的負載均衡器 API,
  • ribbon-eureka:Ribbon 結合 Eureka 客戶端的 API,為負載均衡器提供動態服務注冊串列資訊,
  • ribbon-core:Ribbon 的核心API,

③ springcloud 與 ribbon 整合

與 eureka 整合到 springcloud 類似,springcloud 提供了對應的 spring-cloud-starter-netflix-eureka-client(server) 依賴包,ribbon 則整合到了 spring-cloud-starter-netflix-ribbon 中,一般也不需要單獨引入 ribbon 的依賴包,spring-cloud-starter-netflix-eureka-client 中已經依賴了 spring-cloud-starter-netflix-ribbon,因此我們引入了 spring-cloud-starter-netflix-eureka-client 就可以使用 Ribbon 的功能了,

④ Ribbon 與 RestTemplate 整合使用

在 Spring Cloud 構建的微服務系統中,Ribbon 作為服務消費者的負載均衡器,有兩種使用方式,一種是和 RestTemplate 相結合,另一種是和 Feign 相結合,前面已經演示了帶有負載均衡的 RestTemplate 的使用,下面用一張圖來看看 RestTemplate 基于 Ribbon 的遠程呼叫,

二、RestTemplate 負載均衡

1、@LoadBalanced 注解

以 RestTemplate 為切入點,來看 Ribbon 的負載均衡核心原理,那么首先就要先看看 @LoadBalanced 注解如何讓 RestTemplate 具備負載均衡的能力了,

首先看 @LoadBalanced 這個注解的定義,可以得到如下資訊:

  • 這個注解使用 @Qualifier 標記,其它地方就可以注入 LoadBalanced 注解的 bean 物件,
  • 從注釋中可以了解到,@LoadBalanced 標記的 RestTemplate 或 WebClient 將使用 LoadBalancerClient 來配置 bean 物件,
 1 /**
 2  * Annotation to mark a RestTemplate or WebClient bean to be configured to use a LoadBalancerClient.
 3  * @author Spencer Gibb
 4  */
 5 @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
 6 @Retention(RetentionPolicy.RUNTIME)
 7 @Documented
 8 @Inherited
 9 @Qualifier
10 public @interface LoadBalanced {
11 
12 }

注意 @LoadBalanced 是 spring-cloud-commons 模塊下 loadbalancer 包下的,

2、RestTemplate  負載均衡自動化配置

在 @LoadBalanced 同包下,有一個 LoadBalancerAutoConfiguration 自動化配置類,從注釋也可以看出,這是客戶端負載均衡 Ribbon 的自動化配置類,

從這個自動化配置類可以得到如下資訊:

  • 首先要有 RestTemplate 的依賴和定義了 LoadBalancerClient 物件的前提下才會觸發這個自動化配置類,這也對應了前面,RestTemplate 要用 LoadBalancerClient  來配置,
  • 接著可以看到這個類注入了帶有 @LoadBalanced 標識的 RestTemplate 物件,就是要對這部分物件增加負載均衡的能力,
  • 從 SmartInitializingSingleton 的構造中可以看到,就是在 bean 初始化完成后,用 RestTemplateCustomizer 定制化 RestTemplate,
  • 再往下可以看到,RestTemplateCustomizer 其實就是向 RestTemplate 中添加了 LoadBalancerInterceptor 這個攔截器,
  • 而 LoadBalancerInterceptor 的構建又需要 LoadBalancerClient 和 LoadBalancerRequestFactory,LoadBalancerRequestFactory 則通過 LoadBalancerClient 和 LoadBalancerRequestTransformer 構造完成,
 1 /**
 2  * Auto-configuration for Ribbon (client-side load balancing).
 3  */
 4 @Configuration(proxyBeanMethods = false)
 5 @ConditionalOnClass(RestTemplate.class) // 有 RestTemplate 的依賴
 6 @ConditionalOnBean(LoadBalancerClient.class) // 定義了 LoadBalancerClient 的 bean 物件
 7 @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
 8 public class LoadBalancerAutoConfiguration {
 9 
10     // 注入 @LoadBalanced 標記的 RestTemplate 物件
11     @LoadBalanced
12     @Autowired(required = false)
13     private List<RestTemplate> restTemplates = Collections.emptyList();
14 
15     @Autowired(required = false)
16     private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
17 
18     @Bean
19     public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
20             final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
21         return () -> restTemplateCustomizers.ifAvailable(customizers -> {
22             for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
23                 for (RestTemplateCustomizer customizer : customizers) {
24                     // 利用 RestTemplateCustomizer 定制化 restTemplate
25                     customizer.customize(restTemplate);
26                 }
27             }
28         });
29     }
30 
31     @Bean
32     @ConditionalOnMissingBean
33     public LoadBalancerRequestFactory loadBalancerRequestFactory(
34             LoadBalancerClient loadBalancerClient) {
35         return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
36     }
37 
38     @Configuration(proxyBeanMethods = false)
39     @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
40     static class LoadBalancerInterceptorConfig {
41 
42         // 創建 LoadBalancerInterceptor 需要 LoadBalancerClient 和 LoadBalancerRequestFactory
43         @Bean
44         public LoadBalancerInterceptor ribbonInterceptor(
45                 LoadBalancerClient loadBalancerClient,
46                 LoadBalancerRequestFactory requestFactory) {
47             return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
48         }
49 
50         @Bean
51         @ConditionalOnMissingBean
52         public RestTemplateCustomizer restTemplateCustomizer(
53                 final LoadBalancerInterceptor loadBalancerInterceptor) {
54             return restTemplate -> {
55                 List<ClientHttpRequestInterceptor> list = new ArrayList<>(
56                         restTemplate.getInterceptors());
57                 // 向 restTemplate 添加 LoadBalancerInterceptor 攔截器
58                 list.add(loadBalancerInterceptor);
59                 restTemplate.setInterceptors(list);
60             };
61         }
62 
63     }
64 }

3、RestTemplate 攔截器 LoadBalancerInterceptor

LoadBalancerAutoConfiguration 自動化配置主要就是給 RestTemplate 添加了一個負載均衡攔截器 LoadBalancerInterceptor,從 setInterceptors 的引數可以看出,攔截器的型別是 ClientHttpRequestInterceptor,如果我們想定制化 RestTemplate,就可以實作這個介面來定制化,然后還可以用 @Order 標記攔截器的先后順序,

1 public void setInterceptors(List<ClientHttpRequestInterceptor> interceptors) {
2     if (this.interceptors != interceptors) {
3         this.interceptors.clear();
4         this.interceptors.addAll(interceptors);
5         // 根據 @Order 注解的順序排序
6         AnnotationAwareOrderComparator.sort(this.interceptors);
7     }
8 }

interceptors 攔截器是在 RestTemplate 的父類 InterceptingHttpAccessor 中的, RestTemplate 的類結構如下圖所示,

從 restTemplate.getForEntity("http://demo-producer/v1/uuid", String.class) 這個GET請求進去看看,是如何使用 LoadBalancerInterceptor 的,一步步進去,可以看到最終是進入到 doExecute 這個方法了,

在 doExecute 方法中,首先根據 url、method 創建一個 ClientHttpRequest,然后利用 ClientHttpRequest 來發起請求,

 1 protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
 2         @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
 3     ClientHttpResponse response = null;
 4     try {
 5         // 創建一個 ClientHttpRequest
 6         ClientHttpRequest request = createRequest(url, method);
 7         if (requestCallback != null) {
 8             requestCallback.doWithRequest(request);
 9         }
10         // 呼叫 ClientHttpRequest 的 execute() 方法
11         response = request.execute();
12         // 處理回傳結果
13         handleResponse(url, method, response);
14         return (responseExtractor != null ? responseExtractor.extractData(response) : null);
15     }
16     catch (IOException ex) {
17         // ...
18     }
19     finally {
20         if (response != null) {
21             response.close();
22         }
23     }
24 }
25 
26 //////////////////////////////////////
27 
28 protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
29     ClientHttpRequest request = getRequestFactory().createRequest(url, method);
30     initialize(request);
31     if (logger.isDebugEnabled()) {
32         logger.debug("HTTP " + method.name() + " " + url);
33     }
34     return request;
35 }

InterceptingHttpAccessor 中重寫了父類 HttpAccessor 的 getRequestFactory 方法,父類默認的 requestFactory 是 SimpleClientHttpRequestFactory,

重寫后的 getRequestFactory 方法中,如果攔截器不為空,則基于父類默認的 SimpleClientHttpRequestFactory 和攔截器創建了 InterceptingClientHttpRequestFactory,

 1 public ClientHttpRequestFactory getRequestFactory() {
 2     List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
 3     if (!CollectionUtils.isEmpty(interceptors)) {
 4         ClientHttpRequestFactory factory = this.interceptingRequestFactory;
 5         if (factory == null) {
 6             // 傳入 SimpleClientHttpRequestFactory 和 ClientHttpRequestInterceptor 攔截器
 7             factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
 8             this.interceptingRequestFactory = factory;
 9         }
10         return factory;
11     }
12     else {
13         return super.getRequestFactory();
14     }
15 }

也就是說呼叫了 InterceptingClientHttpRequestFactory 的 createRequest 方法來創建 ClientHttpRequest,進去可以看到,ClientHttpRequest 的實際型別就是 InterceptingClientHttpRequest,

1 protected ClientHttpRequest createRequest(URI uri, HttpMethod httpMethod, ClientHttpRequestFactory requestFactory) {
2     return new InterceptingClientHttpRequest(requestFactory, this.interceptors, uri, httpMethod);
3 }

InterceptingClientHttpRequest 的類結構如下:

RestTemplate 的 doExecute 中呼叫 request.execute() 其實是呼叫了 InterceptingClientHttpRequest 父類 AbstractClientHttpRequest 中的 execute 方法,一步步進去可以發現最終其實是呼叫了 InterceptingClientHttpRequest 的 executeInternal 方法,

在 InterceptingClientHttpRequest  的 executeInternal 方法中,創建了 InterceptingRequestExecution 來執行請求,在 InterceptingRequestExecution 的 execute 方法中,會先遍歷執行所有攔截器,然后通過 ClientHttpRequest 發起真正的 http 請求,

 1 protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
 2     // 創建 InterceptingRequestExecution
 3     InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
 4     // 請求呼叫
 5     return requestExecution.execute(this, bufferedOutput);
 6 }
 7 
 8 private class InterceptingRequestExecution implements ClientHttpRequestExecution {
 9 
10     private final Iterator<ClientHttpRequestInterceptor> iterator;
11 
12     public InterceptingRequestExecution() {
13         // 攔截器迭代器
14         this.iterator = interceptors.iterator();
15     }
16 
17     @Override
18     public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
19         if (this.iterator.hasNext()) {
20             ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
21             // 利用攔截器攔截處理,并傳入 InterceptingRequestExecution
22             return nextInterceptor.intercept(request, body, this);
23         }
24         else {
25             // 攔截器遍歷完后開始發起真正的 http 請求
26             HttpMethod method = request.getMethod();
27             Assert.state(method != null, "No standard HTTP method");
28             ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
29             request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
30             if (body.length > 0) {
31                 if (delegate instanceof StreamingHttpOutputMessage) {
32                     StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
33                     streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
34                 }
35                 else {
36                     StreamUtils.copy(body, delegate.getBody());
37                 }
38             }
39             return delegate.execute();
40         }
41     }
42 }

進入到 LoadBalancerInterceptor 的 intercept 攔截方法內,可以看到從請求的原始地址中獲取了服務名稱,然后呼叫了 loadBalancer 的 execute 方法,也就是 LoadBalancerClient,

到這里,其實已經可以想象,loadBalancer.execute 這行代碼就是根據服務名稱去獲取一個具體的實體,然后將原始地址替換為實體的IP地址,那這個 loadBalancer 又是什么呢?

 1 public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException {
 2     // 原始地址:http://demo-producer/v1/uuid
 3     final URI originalUri = request.getURI();
 4     // host 就是服務名:demo-producer
 5     String serviceName = originalUri.getHost();
 6     Assert.state(serviceName != null,
 7             "Request URI does not contain a valid hostname: " + originalUri);
 8     return this.loadBalancer.execute(serviceName,
 9             this.requestFactory.createRequest(request, body, execution));
10 }

4、負載均衡客戶端 LoadBalancerClient

在配置 LoadBalancerInterceptor 時,需要兩個引數,LoadBalancerClient 和 LoadBalancerRequestFactory,LoadBalancerRequestFactory前面已經知道是如何創建的了,LoadBalancerClient 又是在哪創建的呢?通過 IDEA 搜索,可以發現是在 spring-cloud-netflix-ribbon 模塊下的 RibbonAutoConfiguration 中配置的,可以看到 LoadBalancerClient 的實際型別是 RibbonLoadBalancerClient,

配置類的順序是 EurekaClientAutoConfiguration、RibbonAutoConfiguration、LoadBalancerAutoConfiguration,因為使 RestTemplate 具備負載均衡的能力需要 LoadBalancerInterceptor 攔截器,創建 LoadBalancerInterceptor 又需要 LoadBalancerClient,而 LoadBalancerClient 底層要根據服務名獲取某個實體,肯定又需要一個實體庫,比如從組態檔、注冊中心獲取,從這里就可以看出來,RibbonLoadBalancerClient 默認會從 Eureka 注冊中心獲取實體,

 1 @Configuration
 2 @Conditional(RibbonAutoConfiguration.RibbonClassesConditions.class)
 3 @RibbonClients
 4 // 后于 EurekaClientAutoConfiguration 配置
 5 @AutoConfigureAfter(name = "org.springframework.cloud.netflix.eureka.EurekaClientAutoConfiguration")
 6 // 先于 LoadBalancerAutoConfiguration 配置
 7 @AutoConfigureBefore({ LoadBalancerAutoConfiguration.class,    AsyncLoadBalancerAutoConfiguration.class })
 8 @EnableConfigurationProperties({ RibbonEagerLoadProperties.class, ServerIntrospectorProperties.class })
 9 public class RibbonAutoConfiguration {
10 
11     @Autowired(required = false)
12     private List<RibbonClientSpecification> configurations = new ArrayList<>();
13 
14     @Bean
15     @ConditionalOnMissingBean
16     public SpringClientFactory springClientFactory() {
17         SpringClientFactory factory = new SpringClientFactory();
18         factory.setConfigurations(this.configurations);
19         return factory;
20     }
21 
22     @Bean
23     @ConditionalOnMissingBean(LoadBalancerClient.class)
24     public LoadBalancerClient loadBalancerClient() {
25         return new RibbonLoadBalancerClient(springClientFactory());
26     }
27 }

LoadBalancerClient 主要提供了三個介面:

 1 public interface LoadBalancerClient extends ServiceInstanceChooser {
 2 
 3     // 從 LoadBalancer 找一個 Server 來發送請求
 4     <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
 5 
 6     // 從傳入的 ServiceInstance 取 Server 來發送請求  
 7     <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException;
 8 
 9     // 對原始 URI 重構
10     URI reconstructURI(ServiceInstance instance, URI original);
11 }

 

進入到 RibbonLoadBalancerClient 的 execute 方法中可以看到:

  • 首先根據服務名獲取服務對應的負載均衡器 ILoadBalancer,
  • 然后從 ILoadBalancer 中根據一定策略選出一個實體 Server,
  • 然后將 server、serviceId 等資訊封裝到 RibbonServer 中,也就是一個服務實體 ServiceInstance,
  • 最后呼叫了 LoadBalancerRequest 的 apply,并傳入 ServiceInstance,將地址中的服務名替換為真實的IP地址,
 1 public <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException {
 2     return execute(serviceId, request, null);
 3 }
 4 
 5 public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
 6         throws IOException {
 7     // 根據服務名獲取一個負載均衡器 ILoadBalancer
 8     ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
 9     // 利用負載均衡器獲取實體 Server
10     Server server = getServer(loadBalancer, hint);
11     if (server == null) {
12         throw new IllegalStateException("No instances available for " + serviceId);
13     }
14     // 封裝實體資訊:RibbonServer 的父類是 ServiceInstance
15     RibbonServer ribbonServer = new RibbonServer(serviceId, server,
16             isSecure(server, serviceId),
17             serverIntrospector(serviceId).getMetadata(server));
18     return execute(serviceId, ribbonServer, request);
19 }
20 
21 @Override
22 public <T> T execute(String serviceId, ServiceInstance serviceInstance,
23         LoadBalancerRequest<T> request) throws IOException {
24     Server server = null;
25     if (serviceInstance instanceof RibbonServer) {
26         server = ((RibbonServer) serviceInstance).getServer();
27     }
28     if (server == null) {
29         throw new IllegalStateException("No instances available for " + serviceId);
30     }
31 
32     try {
33         // 處理地址,將服務名替換為真實的IP地址
34         T returnVal = request.apply(serviceInstance);
35         return returnVal;
36     } catch (Exception ex) {
37         // ...
38     }
39     return null;
40 }

這個 LoadBalancerRequest 其實就是 LoadBalancerInterceptor 的 intercept 中創建的一個匿名類,在它的函式式介面內,主要是用裝飾器 ServiceRequestWrapper 將 request 包了一層,

 1 public LoadBalancerRequest<ClientHttpResponse> createRequest(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) {
 2     return instance -> {
 3         // 封裝 HttpRequest,ServiceRequestWrapper 多載了 getURI 方法,
 4         HttpRequest serviceRequest = new ServiceRequestWrapper(request, instance, this.loadBalancer);
 5         if (this.transformers != null) {
 6             for (LoadBalancerRequestTransformer transformer : this.transformers) {
 7                 serviceRequest = transformer.transformRequest(serviceRequest, instance);
 8             }
 9         }
10         // 繼續執行攔截器
11         return execution.execute(serviceRequest, body);
12     };
13 }

ServiceRequestWrapper 主要就是重寫了 getURI 方法,在重寫的 getURI 方法內,它用 loadBalancer 對 URI 進行了重構,進去可以發現,就是將原始地址中的服務名替換為 Server 的真實IP、埠地址,

1 @Override
2 public URI getURI() {
3     // 重構 URI
4     URI uri = this.loadBalancer.reconstructURI(this.instance, getRequest().getURI());
5     return uri;
6 }
 1 public URI reconstructURI(ServiceInstance instance, URI original) {
 2     Assert.notNull(instance, "instance can not be null");
 3     // 服務名
 4     String serviceId = instance.getServiceId();
 5     RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);
 6 
 7     URI uri;
 8     Server server;
 9     if (instance instanceof RibbonServer) {
10         RibbonServer ribbonServer = (RibbonServer) instance;
11         server = ribbonServer.getServer();
12         uri = updateToSecureConnectionIfNeeded(original, ribbonServer);
13     }
14     else {
15         server = new Server(instance.getScheme(), instance.getHost(), instance.getPort());
16         IClientConfig clientConfig = clientFactory.getClientConfig(serviceId);
17         ServerIntrospector serverIntrospector = serverIntrospector(serviceId);
18         uri = updateToSecureConnectionIfNeeded(original, clientConfig, serverIntrospector, server);
19     }
20     // 重構地址
21     return context.reconstructURIWithServer(server, uri);
22 }

reconstructURIWithServer:

 1 public URI reconstructURIWithServer(Server server, URI original) {
 2     String host = server.getHost();
 3     int port = server.getPort();
 4     String scheme = server.getScheme();
 5     
 6     if (host.equals(original.getHost()) 
 7             && port == original.getPort()
 8             && scheme == original.getScheme()) {
 9         return original;
10     }
11     if (scheme == null) {
12         scheme = original.getScheme();
13     }
14     if (scheme == null) {
15         scheme = deriveSchemeAndPortFromPartialUri(original).first();
16     }
17 
18     try {
19         StringBuilder sb = new StringBuilder();
20         sb.append(scheme).append("://");
21         if (!Strings.isNullOrEmpty(original.getRawUserInfo())) {
22             sb.append(original.getRawUserInfo()).append("@");
23         }
24         sb.append(host);
25         if (port >= 0) {
26             sb.append(":").append(port);
27         }
28         sb.append(original.getRawPath());
29         if (!Strings.isNullOrEmpty(original.getRawQuery())) {
30             sb.append("?").append(original.getRawQuery());
31         }
32         if (!Strings.isNullOrEmpty(original.getRawFragment())) {
33             sb.append("#").append(original.getRawFragment());
34         }
35         URI newURI = new URI(sb.toString());
36         return newURI;            
37     } catch (URISyntaxException e) {
38         throw new RuntimeException(e);
39     }
40 }
View Code

5、RestTemplate 負載均衡總結

到這里,我們基本就弄清楚了一個簡單的 @LoadBalanced 注解如何讓 RestTemplate 具備了負載均衡的能力了,這一節來做個小結,

① RestTemplate 如何獲得負載均衡的能力

  • 1)首先 RestTemplate 是 spring-web 模塊下一個訪問第三方 RESTful API 介面的網路請求框架
  • 2)在 spring cloud 微服務架構中,用 @LoadBalanced 對 RestTemplate 做個標記,就可以使 RestTemplate 具備負載均衡的能力
  • 3)使 RestTemplate 具備負載均衡的核心組件就是 LoadBalancerAutoConfiguration 配置類中向其添加的 LoadBalancerInterceptor 負載均衡攔截器
  • 4)RestTemplate 在發起 http 呼叫前,會遍歷所有攔截器來對 RestTemplate 定制化,LoadBalancerInterceptor 就是在這時將URI中的服務名替換為實體的真實IP地址,定制完成后,就會發起真正的 http 請求,
  • 5)LoadBalancerInterceptor 又主要是使用負載均衡客戶端 LoadBalancerClient 來完成URI的重構的,LoadBalancerClient 就可以根據服務名查找一個可用的實體,然后重構URI,

② 核心組件

這里會涉及多個模塊,下面是核心組件的所屬模塊:

spring-web:

  • RestTemplate
  • InterceptingClientHttpRequest:執行攔截器,并發起最終http呼叫

spring-cloud-commons:

  • @LoadBalanced
  • LoadBalancerAutoConfiguration
  • LoadBalancerRequestFactory:創建裝飾類 ServiceRequestWrapper 替換原來的 HttpRequest,多載 getURI 方法,
  • LoadBalancerInterceptor:負載均衡攔截器
  • LoadBalancerClient:負載均衡客戶端介面

spring-cloud-netflix-ribbon:

  • RibbonLoadBalancerClient:LoadBalancerClient 的實作類,Ribbon 的負載均衡客戶端
  • RibbonAutoConfiguration

ribbon-loadbalancer:

  • ILoadBalancer:負載均衡器
  • Server:實體

③ 最后再用一張圖把 RestTemplate 這塊的關系捋一下

三、ILoadBalancer 獲取 Server

從前面 RestTemplate 那張圖可以看出,使 RestTemplate 具備負載均衡的能力,最重要的一個組件之一就是 ILoadBalancer,因為要用它來獲取能呼叫的 Server,有了 Server 才能對原始帶有服務名的 URI 進行重構,這節就來看下 Ribbon 的負載均衡器 ILoadBalancer 是如何創建的以及如何通過它獲取 Server,

1、創建負載均衡器 ILoadBalancer

① SpringClientFactory與背景關系

ILoadBalancer 是用 SpringClientFactory 的 getLoadBalancer 方法根據服務名獲取的,從 getInstance 一步步進去可以發現,每個服務都會創建一個 AnnotationConfigApplicationContext,也就是一個應用背景關系 ApplicationContext,相當于就是一個服務系結一個 ILoadBalancer,

1 public <C> C getInstance(String name, Class<C> type) {
2     C instance = super.getInstance(name, type);
3     if (instance != null) {
4         return instance;
5     }
6     IClientConfig config = getInstance(name, IClientConfig.class);
7     return instantiateWithConfig(getContext(name), type, config);
8 }
1 public <T> T getInstance(String name, Class<T> type) {
2     // 根據名稱獲取
3     AnnotationConfigApplicationContext context = getContext(name);
4     if (BeanFactoryUtils.beanNamesForTypeIncludingAncestors(context, type).length > 0) {
5         return context.getBean(type);
6     }
7     return null;
8 }
 1 protected AnnotationConfigApplicationContext getContext(String name) {
 2     // contexts => Map<String, AnnotationConfigApplicationContext>
 3     if (!this.contexts.containsKey(name)) {
 4         synchronized (this.contexts) {
 5             if (!this.contexts.containsKey(name)) {
 6                 this.contexts.put(name, createContext(name));
 7             }
 8         }
 9     }
10     return this.contexts.get(name);
11 }

除錯看下 AnnotationConfigApplicationContext 背景關系,可以看到放入了與這個服務系結的 ILoadBalancer、IClientConfig、RibbonLoadBalancerContext 等,

它這里為什么要每個服務都系結一個 ApplicationContext 呢?我猜想應該是因為服務實體串列可以有多個來源,比如可以從 eureka 注冊中心獲取、可以通過代碼配置、可以通過組態檔配置,另外每個服務還可以有很多個性化的配置,有默認的配置、定制的全域配置、個別服務的特定配置等,它這樣做就便于用戶定制每個服務的負載均衡策略,

② Ribbon的饑餓加載

而且這個Ribbon客戶端的應用背景關系默認是懶加載的,并不是在啟動的時候就加載背景關系,而是在第一次呼叫的時候才會去初始化,

如果想服務啟動時就初始化,可以指定Ribbon客戶端的具體名稱,在啟動的時候就加載配置項的背景關系:

1 ribbon:
2   eager-load:
3     enabled: true
4     clients: demo-producer,demo-xxx

③ RibbonClientConfiguration

ILoadBalancer 的創建在哪呢?看 RibbonClientConfiguration,這個配置類提供了 ILoadBalancer 的默認創建方法,ILoadBalancer 的默認實作類為 ZoneAwareLoadBalancer,

 1 public class RibbonClientConfiguration {
 2 
 3     public static final int DEFAULT_CONNECT_TIMEOUT = 1000;
 4 
 5     public static final int DEFAULT_READ_TIMEOUT = 1000;
 6 
 7     public static final boolean DEFAULT_GZIP_PAYLOAD = true;
 8 
 9     @RibbonClientName
10     private String name = "client";
11 
12     @Autowired
13     private PropertiesFactory propertiesFactory;
14 
15     @Bean
16     @ConditionalOnMissingBean
17     public IClientConfig ribbonClientConfig() {
18         DefaultClientConfigImpl config = new DefaultClientConfigImpl();
19         config.loadProperties(this.name);
20         config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
21         config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
22         config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
23         return config;
24     }
25 
26     @Bean
27     @ConditionalOnMissingBean
28     public IRule ribbonRule(IClientConfig config) {
29         if (this.propertiesFactory.isSet(IRule.class, name)) {
30             return this.propertiesFactory.get(IRule.class, config, name);
31         }
32         ZoneAvoidanceRule rule = new ZoneAvoidanceRule();
33         rule.initWithNiwsConfig(config);
34         return rule;
35     }
36 
37     @Bean
38     @ConditionalOnMissingBean
39     public IPing ribbonPing(IClientConfig config) {
40         if (this.propertiesFactory.isSet(IPing.class, name)) {
41             return this.propertiesFactory.get(IPing.class, config, name);
42         }
43         return new DummyPing();
44     }
45 
46     @Bean
47     @ConditionalOnMissingBean
48     @SuppressWarnings("unchecked")
49     public ServerList<Server> ribbonServerList(IClientConfig config) {
50         if (this.propertiesFactory.isSet(ServerList.class, name)) {
51             return this.propertiesFactory.get(ServerList.class, config, name);
52         }
53         ConfigurationBasedServerList serverList = new ConfigurationBasedServerList();
54         serverList.initWithNiwsConfig(config);
55         return serverList;
56     }
57 
58     @Bean
59     @ConditionalOnMissingBean
60     public ServerListUpdater ribbonServerListUpdater(IClientConfig config) {
61         return new PollingServerListUpdater(config);
62     }
63 
64     @Bean
65     @ConditionalOnMissingBean
66     @SuppressWarnings("unchecked")
67     public ServerListFilter<Server> ribbonServerListFilter(IClientConfig config) {
68         if (this.propertiesFactory.isSet(ServerListFilter.class, name)) {
69             return this.propertiesFactory.get(ServerListFilter.class, config, name);
70         }
71         ZonePreferenceServerListFilter filter = new ZonePreferenceServerListFilter();
72         filter.initWithNiwsConfig(config);
73         return filter;
74     }
75 
76     @Bean
77     @ConditionalOnMissingBean
78     public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
79             ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
80             IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
81         // 先判斷組態檔中是否配置了負載均衡器
82         if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
83             // 通過反射創建
84             return this.propertiesFactory.get(ILoadBalancer.class, config, name);
85         }
86         return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
87                 serverListFilter, serverListUpdater);
88     }
89 }

可以看到創建 ILoadBalancer 需要 IClientConfig、ServerList<Server>、ServerListFilter<Server>、IRule、IPing、ServerListUpdater,其實這6個介面加上 ILoadBalancer 就是 Ribbon 的核心介面,它們共同定義了 Ribbon 的行為特性,

這7個核心介面和默認實作類如下:

2、客戶端 Ribbon 定制

可以看到在 RibbonClientConfiguration 中創建 IRule、IPing、ServerList<Server>、ServerListFilter<Server>、ILoadBalancer 時,都先通過 propertiesFactory.isSet 判斷是否已配置了對應型別的實作類,沒有才使用默認的實作類,

也就是說針對特定的服務,這幾個類可以自行定制化,也可以通過配置指定其它的實作類,

① 全域策略配置

如果想要全域更改配置,需要加一個配置類,比如像下面這樣:

 1 @Configuration
 2 public class GlobalRibbonConfiguration {
 3 
 4     @Bean
 5     public IRule ribbonRule() {
 6         return new RandomRule();
 7     }
 8 
 9     @Bean
10     public IPing ribbonPing() {
11         return new NoOpPing();
12     }
13 }

② 基于注解的配置

如果想針對某一個服務定制配置,可以通過 @RibbonClients 來配置特定服務的配置類,

需要先定義一個服務配置類:

 1 @Configuration
 2 public class ProducerRibbonConfiguration {
 3 
 4     @Bean
 5     public IRule ribbonRule() {
 6         return new RandomRule();
 7     }
 8 
 9     @Bean
10     public IPing ribbonPing() {
11         return new NoOpPing();
12     }
13 }

用 @RibbonClients 注解為服務指定特定的配置類,并排除掉,不讓 Spring 掃描,否則就變成了全域配置了,

1 @RibbonClients({
2     @RibbonClient(name = "demo-producer", configuration = ProducerRibbonConfiguration.class)
3 })
4 @ComponentScan(excludeFilters = {
5     @ComponentScan.Filter(type = FilterType.ASSIGNABLE_TYPE, classes = ProducerRibbonConfiguration.class)
6 })

③ 組態檔配置

通過組態檔的方式來配置,配置的格式就是 <服務名稱>.ribbon.<屬性>:

 1 demo-producer:
 2   ribbon:
 3     # ILoadBalancer
 4     NFLoadBalancerClassName: com.netflix.loadbalancer.NoOpLoadBalancer
 5     # IRule
 6     NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
 7     # IPing
 8     NFLoadBalancerPingClassName:
 9     # ServerList<Server>
10     NIWSServerListClassName:
11     # ServerListFilter<Server>
12     NIWSServerListFilterClassName:

④ 優先級順序

這幾種配置方式的優先級順序是 組態檔配置 > @RibbonClients 配置 > 全域配置 > 默認配置,

3、ZoneAwareLoadBalancer 選擇 Server

獲取到 ILoadBalancer 后,就要去獲取 Server 了,可以看到,就是用 ILoadBalancer 來獲取 Server,

1 protected Server getServer(ILoadBalancer loadBalancer, Object hint) {
2     if (loadBalancer == null) {
3         return null;
4     }
5     // Use 'default' on a null hint, or just pass it on?
6     return loadBalancer.chooseServer(hint != null ? hint : "default");
7 }

ILoadBalancer  的默認實作類是 ZoneAwareLoadBalancer,進入它的 chooseServer 方法內,如果只配置了一個 zone,就走父類的 chooseServer,否則從多個 zone 中去選擇實體,

 1 public Server chooseServer(Object key) {
 2     // ENABLED => ZoneAwareNIWSDiscoveryLoadBalancer.enabled 默認 true
 3     // AvailableZones 配置的只有一個 defaultZone
 4     if (!ENABLED.get() || getLoadBalancerStats().getAvailableZones().size() <= 1) {
 5         logger.debug("Zone aware logic disabled or there is only one zone");
 6         // 走父類獲取 Server 的邏輯
 7         return super.chooseServer(key);
 8     }
 9     
10     // 多 zone 邏輯....
11 }

先看下 ZoneAwareLoadBalancer 的類繼承結構,ZoneAwareLoadBalancer 的直接父類是 DynamicServerListLoadBalancer,DynamicServerListLoadBalancer 的父類又是 BaseLoadBalancer,

ZoneAwareLoadBalancer 呼叫父類的 chooseServer 方法是在 BaseLoadBalancer 中的,進去可以看到,它主要是用 IRule 來選擇實體,最終選擇實體的策略就交給了 IRule 介面,

 1 public Server chooseServer(Object key) {
 2     if (counter == null) {
 3         counter = createCounter();
 4     }
 5     counter.increment();
 6     if (rule == null) {
 7         return null;
 8     } else {
 9         try {
10             // IRule
11             return rule.choose(key);
12         } catch (Exception e) {
13             logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);
14             return null;
15         }
16     }
17 }

4、ZoneAvoidanceRule 斷言篩選、輪詢選擇 Server

IRule 的默認實作類是 ZoneAvoidanceRule,先看下 ZoneAvoidanceRule 的繼承結構,ZoneAvoidanceRule 的直接父類是 PredicateBasedRule,

rule.choose 的邏輯在 PredicateBasedRule 中,getPredicate() 回傳的是 ZoneAvoidanceRule 創建的一個組合斷言 CompositePredicate,就是用這個斷言來過濾出可用的 Server,并通過輪詢的策略回傳一個 Server,

 1 public Server choose(Object key) {
 2     ILoadBalancer lb = getLoadBalancer();
 3     // getPredicate() Server斷言 => CompositePredicate
 4     // RoundRobin 輪詢方式獲取實體
 5     Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);
 6     if (server.isPresent()) {
 7         return server.get();
 8     } else {
 9         return null;
10     }       
11 }

在初始化 ZoneAvoidanceRule 配置時,創建了 CompositePredicate,可以看到這個組合斷言主要有兩個斷言,一個是斷言 Server 的 zone 是否可用,一個斷言 Server 本身是否可用,例如 Server 無法 ping 通,

 1 public void initWithNiwsConfig(IClientConfig clientConfig) {
 2     // 斷言 Server 的 zone 是否可用,只有一個 defaultZone 的情況下都是可用的
 3     ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this, clientConfig);
 4     // 斷言 Server 是否可用
 5     AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this, clientConfig);
 6     // 封裝組合斷言
 7     compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);
 8 }
 9 
10 private CompositePredicate createCompositePredicate(ZoneAvoidancePredicate p1, AvailabilityPredicate p2) {
11     // 建造者模式創建斷言
12     return CompositePredicate.withPredicates(p1, p2)
13                          .addFallbackPredicate(p2)
14                          .addFallbackPredicate(AbstractServerPredicate.alwaysTrue())
15                          .build();
16     
17 }

接著看選擇Server的 chooseRoundRobinAfterFiltering,引數 servers 是通過 ILoadBalancer 獲取的所有實體,可以看到它其實就是回傳了 ILoadBalancer 在記憶體中快取的服務所有 Server,這個 Server 從哪來的我們后面再來看,

1 public List<Server> getAllServers() {
2     // allServerList => List<Server>
3     return Collections.unmodifiableList(allServerList);
4 }

先對所有實體通過斷言過濾掉不可用的 Server,然后是通過輪詢的方式獲取一個 Server 回傳,這就是默認配置下 ILoadBalancer(ZoneAwareLoadBalancer) 通過 IRule(ZoneAvoidanceRule) 選擇 Server 的流程了,

 1 public Optional<Server> chooseRoundRobinAfterFiltering(List<Server> servers, Object loadBalancerKey) {
 2     // 斷言獲取可用的 Server
 3     List<Server> eligible = getEligibleServers(servers, loadBalancerKey);
 4     if (eligible.size() == 0) {
 5         return Optional.absent();
 6     }
 7     // 通過取模的方式輪詢 Server
 8     return Optional.of(eligible.get(incrementAndGetModulo(eligible.size())));
 9 }
10 
11 public List<Server> getEligibleServers(List<Server> servers, Object loadBalancerKey) {
12     if (loadBalancerKey == null) {
13         return ImmutableList.copyOf(Iterables.filter(servers, this.getServerOnlyPredicate()));            
14     } else {
15         List<Server> results = Lists.newArrayList();
16         // 對每個 Server 斷言
17         for (Server server: servers) {
18             if (this.apply(new PredicateKey(loadBalancerKey, server))) {
19                 results.add(server);
20             }
21         }
22         return results;            
23     }
24 }
25 
26 private int incrementAndGetModulo(int modulo) {
27     for (;;) {
28         int current = nextIndex.get();
29         // 模運算取余數
30         int next = (current + 1) % modulo;
31         // CAS 更新 nextIndex
32         if (nextIndex.compareAndSet(current, next) && current < modulo)
33             return current;
34     }
35 }

四、Ribbon 整合 Eureka Client 拉取Server串列

前面在通過 IRule 選擇 Server 的時候,首先通過 lb.getAllServers() 獲取了所有的 Server,那這些 Server 從哪里來的呢,這節就來看下,

1、ILoadBalancer 初始化

ILoadBalancer 的默認實作類是 ZoneAwareLoadBalancer,先從 ZoneAwareLoadBalancer 的構造方法進去看看都做了些什么事情,

 1 @Bean
 2 @ConditionalOnMissingBean
 3 public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
 4         ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
 5         IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
 6     if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
 7         return this.propertiesFactory.get(ILoadBalancer.class, config, name);
 8     }
 9     return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
10             serverListFilter, serverListUpdater);
11 }

可以看到,ZoneAwareLoadBalancer 直接呼叫了父類 DynamicServerListLoadBalancer 的構造方法,DynamicServerListLoadBalancer 先呼叫父類 BaseLoadBalancer 初始化,然后又做了一些剩余的初始化作業,

 1 public ZoneAwareLoadBalancer(IClientConfig clientConfig, IRule rule,
 2                              IPing ping, ServerList<T> serverList, ServerListFilter<T> filter,
 3                              ServerListUpdater serverListUpdater) {
 4     // DynamicServerListLoadBalancer
 5     super(clientConfig, rule, ping, serverList, filter, serverListUpdater);
 6 }
 7 
 8 public DynamicServerListLoadBalancer(IClientConfig clientConfig, IRule rule, IPing ping,
 9                                      ServerList<T> serverList, ServerListFilter<T> filter,
10                                      ServerListUpdater serverListUpdater) {
11     // BaseLoadBalancer
12     super(clientConfig, rule, ping);
13     this.serverListImpl = serverList;
14     this.filter = filter;
15     this.serverListUpdater = serverListUpdater;
16     if (filter instanceof AbstractServerListFilter) {
17         ((AbstractServerListFilter) filter).setLoadBalancerStats(getLoadBalancerStats());
18     }
19     // 剩余的一些初始化
20     restOfInit(clientConfig);
21 }
22 
23 public BaseLoadBalancer(IClientConfig config, IRule rule, IPing ping) {
24     // createLoadBalancerStatsFromConfig => LoadBalancerStats 統計
25     initWithConfig(config, rule, ping, createLoadBalancerStatsFromConfig(config));
26 

看 BaseLoadBalancer 的 initWithConfig,主要做了如下的初始化:

  • 設定 IPing 和 IRule,ping 的間隔時間是 30 秒,setPing 會啟動一個后臺定時任務,然后每隔30秒運行一次 PingTask 任務,
  • 設定了 ILoadBalancer 的 統計器 LoadBalancerStats,對 ILoadBalancer 的 Server 狀態進行統計,比如連接失敗、成功、熔斷等資訊,
  • 在啟用 PrimeConnections 請求預熱的情況下,創建 PrimeConnections 來預熱客戶端 與 Server 的鏈接,默認是關閉的,
  • 最后是注冊了一些監控、開啟請求預熱,
 1 void initWithConfig(IClientConfig clientConfig, IRule rule, IPing ping, LoadBalancerStats stats) {
 2     this.config = clientConfig;
 3     String clientName = clientConfig.getClientName();
 4     this.name = clientName;
 5     // ping 間隔時間,默認30秒
 6     int pingIntervalTime = Integer.parseInt(""
 7             + clientConfig.getProperty(
 8                     CommonClientConfigKey.NFLoadBalancerPingInterval,
 9                     Integer.parseInt("30")));
10     // 沒看到用的地方                
11     int maxTotalPingTime = Integer.parseInt(""
12             + clientConfig.getProperty(
13                     CommonClientConfigKey.NFLoadBalancerMaxTotalPingTime,
14                     Integer.parseInt("2")));
15     // 設定 ping 間隔時間,并重新設定了 ping 任務
16     setPingInterval(pingIntervalTime);
17     setMaxTotalPingTime(maxTotalPingTime);
18 
19     // 設定 IRule、IPing
20     setRule(rule);
21     setPing(ping);
22 
23     setLoadBalancerStats(stats);
24     rule.setLoadBalancer(this);
25     if (ping instanceof AbstractLoadBalancerPing) {
26         ((AbstractLoadBalancerPing) ping).setLoadBalancer(this);
27     }
28     logger.info("Client: {} instantiated a LoadBalancer: {}", name, this);
29     
30     // PrimeConnections,請求預熱,默認關閉
31     // 作用主要用于解決那些部署環境(如讀EC2)在實際使用實時請求之前,從防火墻連接/路徑進行預熱(比如先加白名單、初始化等等動作比較耗時,可以用它先去打通),
32     boolean enablePrimeConnections = clientConfig.get(
33             CommonClientConfigKey.EnablePrimeConnections, DefaultClientConfigImpl.DEFAULT_ENABLE_PRIME_CONNECTIONS);
34     if (enablePrimeConnections) {
35         this.setEnablePrimingConnections(true);
36         PrimeConnections primeConnections = new PrimeConnections(
37                 this.getName(), clientConfig);
38         this.setPrimeConnections(primeConnections);
39     }
40     // 注冊一些監控
41     init();
42 }
43 
44 protected void init() {
45     Monitors.registerObject("LoadBalancer_" + name, this);
46     // register the rule as it contains metric for available servers count
47     Monitors.registerObject("Rule_" + name, this.getRule());
48     // 默認關閉
49     if (enablePrimingConnections && primeConnections != null) {
50         primeConnections.primeConnections(getReachableServers());
51     }
52 }

再看下 DynamicServerListLoadBalancer 的初始化,核心的初始化邏輯在 restOfInit 中,主要就是做了兩件事情:

  • 開啟動態更新 Server 的特性,比如實體上線、下線、故障等,要能夠更新 ILoadBalancer 的 Server 串列,
  • 然后就全量更新一次本地的 Server 串列,
 1 void restOfInit(IClientConfig clientConfig) {
 2     boolean primeConnection = this.isEnablePrimingConnections();
 3     // turn this off to avoid duplicated asynchronous priming done in BaseLoadBalancer.setServerList()
 4     this.setEnablePrimingConnections(false);
 5     
 6     // 開啟動態更新 Server 的特性
 7     enableAndInitLearnNewServersFeature();
 8 
 9     // 更新 Server 串列
10     updateListOfServers();
11     
12     // 開啟請求預熱的情況下,對可用的 Server 進行預熱
13     if (primeConnection && this.getPrimeConnections() != null) {
14         this.getPrimeConnections()
15                 .primeConnections(getReachableServers());
16     }
17     this.setEnablePrimingConnections(primeConnection);
18     LOGGER.info("DynamicServerListLoadBalancer for client {} initialized: {}", clientConfig.getClientName(), this.toString());
19 }

2、全量更新Server串列

先看下 updateListOfServers() 是如何更新 Server 串列的,進而看下 ILoadBalancer 是如何存盤 Server 的,

  • 首先使用 ServerList 獲取所有的 Server 串列,在 RibbonClientConfiguration 中配置的是 ConfigurationBasedServerList,但和 eureka 集合和,就不是 ConfigurationBasedServerList 了,這塊下一節再來看,
  • 然后使用 ServerListFilter 對 Server 串列過濾,其默認實作類是 ZonePreferenceServerListFilter,它主要是過濾出當前 Zone(defaultZone)下的 Server,
  • 最后就是更新所有 Server 串列,先是設定 Server alive,然后呼叫父類(BaseLoadBalancer)的 setServersList 來更新Server串列,這說明 Server 是存盤在 BaseLoadBalancer 里的,
 1 public void updateListOfServers() {
 2     List<T> servers = new ArrayList<T>();
 3     if (serverListImpl != null) {
 4         // 從 ServerList 獲取所有 Server 串列
 5         servers = serverListImpl.getUpdatedListOfServers();
 6         LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);
 7 
 8         if (filter != null) {
 9             // 用 ServerListFilter 過濾 Server
10             servers = filter.getFilteredListOfServers(servers);
11             LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers);
12         }
13     }
14     // 更新所有 Server 到本地快取
15     updateAllServerList(servers);
16 }
17 
18 protected void updateAllServerList(List<T> ls) {
19     if (serverListUpdateInProgress.compareAndSet(false, true)) {
20         try {
21             for (T s : ls) {
22                 s.setAlive(true); // 設定 Server alive
23             }
24             setServersList(ls);
25             // 強制初始化 Ping
26             super.forceQuickPing();
27         } finally {
28             serverListUpdateInProgress.set(false);
29         }
30     }
31 }
32 
33 public void setServersList(List lsrv) {
34     // BaseLoadBalancer
35     super.setServersList(lsrv);
36     
37     // 將 Server 更新到 LoadBalancerStats 統計中 ....
38 }

接著看父類的 setServersList,可以看出,存盤所有 Server 的資料結構 allServerList 是一個加了 synchronized 的執行緒安全的容器,setServersList 就是直接將得到的 Server 串列替換  allServerList,

 1 public void setServersList(List lsrv) {
 2     Lock writeLock = allServerLock.writeLock();
 3     ArrayList<Server> newServers = new ArrayList<Server>();
 4     // 加寫鎖
 5     writeLock.lock();
 6     try {
 7         // for 回圈將 lsrv 中的 Server 轉移到 allServers
 8         ArrayList<Server> allServers = new ArrayList<Server>();
 9         for (Object server : lsrv) {
10             if (server == null) {
11                 continue;
12             }
13             if (server instanceof String) {
14                 server = new Server((String) server);
15             }
16             if (server instanceof Server) {
17                 logger.debug("LoadBalancer [{}]:  addServer [{}]", name, ((Server) server).getId());
18                 allServers.add((Server) server);
19             } else {
20                 throw new IllegalArgumentException("Type String or Server expected, instead found:" + server.getClass());
21             }
22         }
23         
24         boolean listChanged = false;
25         // allServerList => volatile List<Server> allServerList = Collections.synchronizedList(new ArrayList<Server>())
26         if (!allServerList.equals(allServers)) {
27             listChanged = true;
28             // 服務串列變更監聽器 ServerListChangeListener, 發出服務變更通知...
29         }
30         
31         // 啟用了服務預熱,開始 Server 預熱...
32         
33         // 直接替換
34         allServerList = allServers;
35         if (canSkipPing()) {
36             for (Server s : allServerList) {
37                 s.setAlive(true);
38             }
39             upServerList = allServerList;
40         } else if (listChanged) {
41             forceQuickPing();
42         }
43     } finally {
44         // 釋放寫鎖
45         writeLock.unlock();
46     }
47 }

前面 chooseRoundRobinAfterFiltering 獲取所有 Server 時就是回傳的這個 allServerList串列,

1 public List<Server> getAllServers() {
2     return Collections.unmodifiableList(allServerList);
3 }

3、Eureka Ribbon 客戶端配置

獲取 Server 的組件是 ServerList,RibbonClientConfiguration 中配置的默認實作類是 ConfigurationBasedServerList,ConfigurationBasedServerList 默認是從組態檔中獲取,可以像下面這樣配置服務實體地址,多個 Server 地址用逗號隔開,

1 demo-producer:
2   ribbon:
3     listOfServers: http://10.215.0.92:8010,http://10.215.0.92:8011

但是和 eureka-client 結合后,也就是引入 spring-cloud-starter-netflix-eureka-client 的客戶端依賴,它會幫我們引入 spring-cloud-netflix-eureka-client 依賴,這個包中有一個 RibbonEurekaAutoConfiguration 自動化配置類,它通過 @RibbonClients 注解定義了全域的 Ribbon 客戶端配置類 為 EurekaRibbonClientConfiguration

1 @Configuration(proxyBeanMethods = false)
2 @EnableConfigurationProperties
3 @ConditionalOnRibbonAndEurekaEnabled
4 @AutoConfigureAfter(RibbonAutoConfiguration.class)
5 @RibbonClients(defaultConfiguration = EurekaRibbonClientConfiguration.class)
6 public class RibbonEurekaAutoConfiguration {
7 
8 }

進入 EurekaRibbonClientConfiguration  可以看到:

  • IPing 的默認實作類為 NIWSDiscoveryPing,
  • ServerList 的默認實作類為 DomainExtractingServerList,但是 DomainExtractingServerList 在構造時又傳入了一個型別為 DiscoveryEnabledNIWSServerList 的 ServerList,看名字大概也可以看出,DiscoveryEnabledNIWSServerList 就是從 EurekaClient 獲取 Server 的組件,
 1 @Configuration(proxyBeanMethods = false)
 2 public class EurekaRibbonClientConfiguration {
 3     @Value("${ribbon.eureka.approximateZoneFromHostname:false}")
 4     private boolean approximateZoneFromHostname = false;
 5 
 6     @RibbonClientName
 7     private String serviceId = "client";
 8     @Autowired
 9     private PropertiesFactory propertiesFactory;
10 
11     @Bean
12     @ConditionalOnMissingBean
13     public IPing ribbonPing(IClientConfig config) {
14         if (this.propertiesFactory.isSet(IPing.class, serviceId)) {
15             return this.propertiesFactory.get(IPing.class, config, serviceId);
16         }
17         NIWSDiscoveryPing ping = new NIWSDiscoveryPing();
18         ping.initWithNiwsConfig(config);
19         return ping;
20     }
21 
22     @Bean
23     @ConditionalOnMissingBean
24     public ServerList<?> ribbonServerList(IClientConfig config,
25             Provider<EurekaClient> eurekaClientProvider) {
26         if (this.propertiesFactory.isSet(ServerList.class, serviceId)) {
27             return this.propertiesFactory.get(ServerList.class, config, serviceId);
28         }
29         DiscoveryEnabledNIWSServerList discoveryServerList = new DiscoveryEnabledNIWSServerList(config, eurekaClientProvider);
30         DomainExtractingServerList serverList = new DomainExtractingServerList(discoveryServerList, config, this.approximateZoneFromHostname);
31         return serverList;
32     }
33 }

4、從 DiscoveryClient 獲取Server串列

DynamicServerListLoadBalancer 中通過 ServerList 的 getUpdatedListOfServers 方法全量獲取服務串列,在 eureka-client 環境下,ServerList 默認實作類為 DomainExtractingServerList,那就先看下它的 getUpdatedListOfServers 方法,

可以看出,DomainExtractingServerList 先用 DomainExtractingServerList 獲取服務串列,然后根據 Ribbon 客戶端配置重新構造 Server 物件回傳,獲取服務串列的核心在 DiscoveryEnabledNIWSServerList 中,

 1 @Override
 2 public List<DiscoveryEnabledServer> getUpdatedListOfServers() {
 3     // list => DiscoveryEnabledNIWSServerList
 4     List<DiscoveryEnabledServer> servers = setZones(this.list.getUpdatedListOfServers());
 5     return servers;
 6 }
 7 
 8 private List<DiscoveryEnabledServer> setZones(List<DiscoveryEnabledServer> servers) {
 9     List<DiscoveryEnabledServer> result = new ArrayList<>();
10     boolean isSecure = this.ribbon.isSecure(true);
11     boolean shouldUseIpAddr = this.ribbon.isUseIPAddrForServer();
12     // 根據客戶端配置重新構造 DomainExtractingServer 回傳
13     for (DiscoveryEnabledServer server : servers) {
14         result.add(new DomainExtractingServer(server, isSecure, shouldUseIpAddr, this.approximateZoneFromHostname));
15     }
16     return result;
17 }

先看下 DiscoveryEnabledNIWSServerList  的構造初始化:

  • 主要是傳入了 Provider<EurekaClient> 用來獲取 EurekaClient
  • 另外還設定了客戶端名稱 clientName ,以及 vipAddresses 也是客戶端名稱,這個后面會用得上,
 1 public DiscoveryEnabledNIWSServerList(IClientConfig clientConfig, Provider<EurekaClient> eurekaClientProvider) {
 2     this.eurekaClientProvider = eurekaClientProvider;
 3     initWithNiwsConfig(clientConfig);
 4 }
 5 
 6 @Override
 7 public void initWithNiwsConfig(IClientConfig clientConfig) {
 8     // 客戶端名稱,就是服務名稱
 9     clientName = clientConfig.getClientName();
10     // vipAddresses 得到的也是客戶端名稱
11     vipAddresses = clientConfig.resolveDeploymentContextbasedVipAddresses();
12     
13     // 其它的一些配置....
14 }

接著看獲取實體的 getUpdatedListOfServers,可以看到它的核心邏輯就是根據服務名從 EurekaClient 獲取 InstanceInfo 實體串列,然后封裝 Server 資訊回傳,

 1 public List<DiscoveryEnabledServer> getUpdatedListOfServers(){
 2     return obtainServersViaDiscovery();
 3 }
 4 
 5 private List<DiscoveryEnabledServer> obtainServersViaDiscovery() {
 6     List<DiscoveryEnabledServer> serverList = new ArrayList<DiscoveryEnabledServer>();
 7     if (eurekaClientProvider == null || eurekaClientProvider.get() == null) {
 8         return new ArrayList<DiscoveryEnabledServer>();
 9     }
10     // 得到 EurekaClient,實際型別是 CloudEurekaClient,其父類是 DiscoveryClient
11     EurekaClient eurekaClient = eurekaClientProvider.get();
12     if (vipAddresses!=null){
13         // 分割 vipAddresses,默認就是服務名稱
14         for (String vipAddress : vipAddresses.split(",")) {
15             // 根據服務名稱從 EurekaClient 獲取實體資訊
16             List<InstanceInfo> listOfInstanceInfo = eurekaClient.getInstancesByVipAddress(vipAddress, isSecure, targetRegion);
17             for (InstanceInfo ii : listOfInstanceInfo) {
18                 if (ii.getStatus().equals(InstanceStatus.UP)) {
19                     // ...
20                     // 根據實體資訊 InstanceInfo 創建 Server
21                     DiscoveryEnabledServer des = createServer(ii, isSecure, shouldUseIpAddr);
22                     serverList.add(des);
23                 }
24             }
25             if (serverList.size()>0 && prioritizeVipAddressBasedServers){
26                 break; // if the current vipAddress has servers, we dont use subsequent vipAddress based servers
27             }
28         }
29     }
30     return serverList;
31 }

注意這里的 vipAddress 其實就是服務名:

最后看 EurekaClient 的 getInstancesByVipAddress,到這里就很清楚了,其實就是從 DiscoveryClient 的本地應用 Applications 中根據服務名取出所有的實體串列,

這里就和 Eureka 原始碼那塊銜接上了,eureka-client 全量抓取注冊表以及每隔30秒增量抓取注冊表,都是合并到本地的 Applications 中,Ribbon 與 Eureka 結合后,Ribbon 獲取 Server 就從 DiscoveryClient 的 Applications 中獲取 Server 串列了,

 1 public List<InstanceInfo> getInstancesByVipAddress(String vipAddress, boolean secure, String region) {
 2     // ...
 3     Applications applications;
 4     if (instanceRegionChecker.isLocalRegion(region)) {
 5         // 取本地應用 Applications
 6         applications = this.localRegionApps.get();
 7     } else {
 8         applications = remoteRegionVsApps.get(region);
 9         if (null == applications) {
10             return Collections.emptyList();
11         }
12     }
13 
14     if (!secure) {
15         // 回傳服務名對應的實體
16         return applications.getInstancesByVirtualHostName(vipAddress);
17     } else {
18         return applications.getInstancesBySecureVirtualHostName(vipAddress);
19     }
20 }

5、定時更新Server串列

DynamicServerListLoadBalancer 初始化時,有個方法還沒說,就是 enableAndInitLearnNewServersFeature(),這個方法只是呼叫 ServerListUpdater 啟動了一個 UpdateAction,這個 UpdateAction 又只是呼叫了一下 updateListOfServers 方法,就是前面講解過的全量更新 Server 的邏輯,

 1 public void enableAndInitLearnNewServersFeature() {
 2     serverListUpdater.start(updateAction);
 3 }
 4 
 5 protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() {
 6     @Override
 7     public void doUpdate() {
 8         // 呼叫 updateListOfServers
 9         updateListOfServers();
10     }
11 };

ServerListUpdater 的默認實作類是 PollingServerListUpdater,看下它的 start 方法:

其實就是以固定的頻率,每隔30秒呼叫一下 updateListOfServers 方法,將 DiscoveryClient 中 Applications 中快取的實體同步到 ILoadBalancer 中的 allServerList 串列中,

 1 public synchronized void start(final UpdateAction updateAction) {
 2     if (isActive.compareAndSet(false, true)) {
 3         final Runnable wrapperRunnable = new Runnable() {
 4             @Override
 5             public void run() {
 6                 // ...
 7                 try {
 8                     // 執行一次 updateListOfServers
 9                     updateAction.doUpdate();
10                     // 設定最后更新時間
11                     lastUpdated = System.currentTimeMillis();
12                 } catch (Exception e) {
13                     logger.warn("Failed one update cycle", e);
14                 }
15             }
16         };
17         
18         // 固定頻率調度
19         scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay(
20                 wrapperRunnable,
21                 initialDelayMs, // 默認 1000
22                 refreshIntervalMs, // 默認 30 * 1000
23                 TimeUnit.MILLISECONDS
24         );
25     } else {
26         logger.info("Already active, no-op");
27     }
28 }

6、判斷Server是否存活

在創建 ILoadBalancer 時,IPing 還沒有看過是如何作業的,在初始化的時候,可以看到,主要就是設定了當前的 ping,然后重新設定了一個調度任務,默認每隔30秒調度一次 PingTask 任務,

 1 public void setPing(IPing ping) {
 2     if (ping != null) {
 3         if (!ping.equals(this.ping)) {
 4             this.ping = ping;
 5             // 設定 Ping 任務
 6             setupPingTask();
 7         }
 8     } else {
 9         this.ping = null;
10         // cancel the timer task
11         lbTimer.cancel();
12     }
13 }
14 
15 void setupPingTask() {
16     // ...
17     // 創建一個定時調度器
18     lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true);
19     // pingIntervalTime 默認為 30 秒,每隔30秒調度一次 PingTask
20     lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000);
21     // 立即發起以 Ping
22     forceQuickPing();
23 }

ShutdownEnabledTimer 可以簡單了解下,它是繼承自 Timer 的,它在創建的時候向 Runtime 注冊了一個回呼,在 jvm 關閉的時候來取消 Timer 的執行,進而釋放資源,

 1 public class ShutdownEnabledTimer extends Timer {
 2     private static final Logger LOGGER = LoggerFactory.getLogger(ShutdownEnabledTimer.class);
 3 
 4     private Thread cancelThread;
 5     private String name;
 6 
 7     public ShutdownEnabledTimer(String name, boolean daemon) {
 8         super(name, daemon);
 9         this.name = name;
10         // 取消定時器的執行緒
11         this.cancelThread = new Thread(new Runnable() {
12             public void run() {
13                 ShutdownEnabledTimer.super.cancel();
14             }
15         });
16 
17         LOGGER.info("Shutdown hook installed for: {}", this.name);
18         // 向 Runtime 注冊一個鉤子,在 jvm 關閉時,呼叫 cancelThread 取消定時任務
19         Runtime.getRuntime().addShutdownHook(this.cancelThread);
20     }
21 
22     @Override
23     public void cancel() {
24         super.cancel();
25         LOGGER.info("Shutdown hook removed for: {}", this.name);
26         try {
27             Runtime.getRuntime().removeShutdownHook(this.cancelThread);
28         } catch (IllegalStateException ise) {
29             LOGGER.info("Exception caught (might be ok if at shutdown)", ise);
30         }
31 
32     }
33 }
View Code

再來看下 PingTask,PingTask 核心邏輯就是遍歷 allServers 串列,使用 IPingStrategy 和 IPing 來判斷 Server 是否存活,并更新 Server 的狀態,以及將所有存活的 Server 更新到 upServerList 中,upServerList 快取了所有存活的 Server,

 1 class PingTask extends TimerTask {
 2     public void run() {
 3         try {
 4             // pingStrategy => SerialPingStrategy
 5             new Pinger(pingStrategy).runPinger();
 6         } catch (Exception e) {
 7             logger.error("LoadBalancer [{}]: Error pinging", name, e);
 8         }
 9     }
10 }
11 
12 class Pinger {
13     private final IPingStrategy pingerStrategy;
14 
15     public Pinger(IPingStrategy pingerStrategy) {
16         this.pingerStrategy = pingerStrategy;
17     }
18 
19     public void runPinger() throws Exception {
20         if (!pingInProgress.compareAndSet(false, true)) { 
21             return; // Ping in progress - nothing to do
22         }
23 
24         Server[] allServers = null;
25         boolean[] results = null;
26 
27         Lock allLock = null;
28         Lock upLock = null;
29         
30         try {
31             allLock = allServerLock.readLock();
32             allLock.lock();
33             // 加讀鎖,取出 allServerList 中的 Server
34             allServers = allServerList.toArray(new Server[allServerList.size()]);
35             allLock.unlock();
36 
37             int numCandidates = allServers.length;
38             // 使用 IPingStrategy 和 IPing 對所有 Server 發起 ping 請求
39             results = pingerStrategy.pingServers(ping, allServers);
40 
41             final List<Server> newUpList = new ArrayList<Server>();
42             final List<Server> changedServers = new ArrayList<Server>();
43 
44             for (int i = 0; i < numCandidates; i++) {
45                 boolean isAlive = results[i];
46                 Server svr = allServers[i];
47                 boolean oldIsAlive = svr.isAlive();
48                 // 設定 alive 是否存活
49                 svr.setAlive(isAlive);
50 
51                 // 實體變更
52                 if (oldIsAlive != isAlive) {
53                     changedServers.add(svr);
54                     logger.debug("LoadBalancer [{}]:  Server [{}] status changed to {}",  name, svr.getId(), (isAlive ? "ALIVE" : "DEAD"));
55                 }
56 
57                 // 添加存活的 Server
58                 if (isAlive) {
59                     newUpList.add(svr);
60                 }
61             }
62             upLock = upServerLock.writeLock();
63             upLock.lock();
64             // 更新 upServerList,upServerList 只保存了存活的 Server
65             upServerList = newUpList;
66             upLock.unlock();
67             // 通知變更
68             notifyServerStatusChangeListener(changedServers);
69         } finally {
70             pingInProgress.set(false);
71         }
72     }
73 }
View Code

IPingStrategy 的默認實作類是 SerialPingStrategy,進入可以發現它只是遍歷所有 Server,然后用 IPing 判斷 Server 是否存活,

 1 private static class SerialPingStrategy implements IPingStrategy {
 2     @Override
 3     public boolean[] pingServers(IPing ping, Server[] servers) {
 4         int numCandidates = servers.length;
 5         boolean[] results = new boolean[numCandidates];
 6 
 7         for (int i = 0; i < numCandidates; i++) {
 8             results[i] = false;
 9             try {
10                 if (ping != null) {
11                     // 使用 IPing 判斷 Server 是否存活
12                     results[i] = ping.isAlive(servers[i]);
13                 }
14             } catch (Exception e) {
15                 logger.error("Exception while pinging Server: '{}'", servers[i], e);
16             }
17         }
18         return results;
19     }
20 }

在集成 eureka-client 后,IPing默認實作類是 NIWSDiscoveryPing,看它的 isAlive 方法,其實就是判斷對應 Server 的實體 InstanceInfo 的狀態是否是 UP 狀態,UP狀態就表示 Server 存活,

 1 public boolean isAlive(Server server) {
 2     boolean isAlive = true;
 3     if (server!=null && server instanceof DiscoveryEnabledServer){
 4         DiscoveryEnabledServer dServer = (DiscoveryEnabledServer)server;                
 5         InstanceInfo instanceInfo = dServer.getInstanceInfo();
 6         if (instanceInfo!=null){                    
 7             InstanceStatus status = instanceInfo.getStatus();
 8             if (status!=null){
 9                 // 判斷Server對應的實體狀態是否是 UP
10                 isAlive = status.equals(InstanceStatus.UP);
11             }
12         }
13     }
14     return isAlive;
15 }

7、一張圖總結 Ribbon 核心原理

① Ribbon 核心作業原理總結

  • 首先,Ribbon 的7個核心介面共同定義了 Ribbon 的行為特性,它們就是 Ribbon 的核心骨架,
  • 使用 Ribbon 來對客戶端做負載均衡,基本的用法就是用 @LoadBalanced 注解標注一個 RestTemplate 的 bean 物件,之后在 LoadBalancerAutoConfiguration 配置類中會對帶有 @LoadBalanced 注解的 RestTemplate 添加 LoadBalancerInterceptor 攔截器,
  • LoadBalancerInterceptor 會攔截 RestTemplate 的 HTTP 請求,將請求系結進 Ribbon 負載均衡的生命周期,然后使用 LoadBalancerClient 的 execute 方法來處理請求,
  • LoadBalancerClient 首先會得到一個 ILoadBalancer,再使用它去得到一個 Server,這個 Server 就是具體某一個實體的資訊封裝,得到 Server 之后,就用 Server 的 IP 和埠重構原始 URI,
  • ILoadBalancer 最終在選擇實體的時候,會通過 IRule 均衡策略來選擇一個 Server,
  • ILoadBalancer 的父類 BaseLoadBalancer 中有一個 allServerList 串列快取了所有 Server,Ribbon 中 Server 的來源就是 allServerList,
  • 在加載Ribbon客戶端背景關系時,ILoadBalancer 會用 ServerList 從 DiscoveryClient 的 Applications 中獲取客戶端對應的實體串列,然后使用 ServerListFilter 過濾,最后更新到 allServerList 中,
  • ILoadBalancer 還會開啟一個后臺任務 ServerListUpdater ,每隔30秒運行一次,用 ServerList 將 DiscoveryClient 的 Applications 中的實體串列同步到 allServerList 中,
  • ILoadBalancer 還會開啟一個后臺任務 PingTask,每隔30秒運行一次,用 IPing 判斷 Server 的存活狀態,EurekaClient 環境下,就是判斷 InstanceInfo 的狀態是否為 UP,

② 下面用一張圖來總結下 Ribbon 這塊獲取Server的核心流程以及對應的核心介面間的關系,

五、Ribbon 核心介面

前面已經了解到 Ribbon 核心介面以及默認實作如何協作來查找要呼叫的一個實體,這節再來看下各個核心介面的一些特性及其它實作類,

1、客戶端配置 — IClientConfig

IClientConfig 就是管理客戶端配置的核心介面,它的默認實作類是 DefaultClientConfigImpl,可以看到在創建 IClientConfig 時,設定了 Ribbon 客戶端默認的連接和讀取超時時間為 1 秒,例如讀取如果超過1秒,就會回傳超時,這兩個一般需要根據實際情況來調整,

 1 @Bean
 2 @ConditionalOnMissingBean
 3 public IClientConfig ribbonClientConfig() {
 4     DefaultClientConfigImpl config = new DefaultClientConfigImpl();
 5     // 加載配置
 6     config.loadProperties(this.name);
 7     // 連接超時默認 1 秒
 8     config.set(CommonClientConfigKey.ConnectTimeout, DEFAULT_CONNECT_TIMEOUT);
 9     // 讀取超時默認 1 秒
10     config.set(CommonClientConfigKey.ReadTimeout, DEFAULT_READ_TIMEOUT);
11     config.set(CommonClientConfigKey.GZipPayload, DEFAULT_GZIP_PAYLOAD);
12     return config;
13 }

CommonClientConfigKey 這個類定義了 Ribbon 客戶端相關的所有配置的鍵常量,可以通過這個類來看有哪些配置,

  1 public abstract class CommonClientConfigKey<T> implements IClientConfigKey<T> {
  2 
  3     public static final IClientConfigKey<String> AppName = new CommonClientConfigKey<String>("AppName"){};
  4     
  5     public static final IClientConfigKey<String> Version = new CommonClientConfigKey<String>("Version"){};
  6         
  7     public static final IClientConfigKey<Integer> Port = new CommonClientConfigKey<Integer>("Port"){};
  8     
  9     public static final IClientConfigKey<Integer> SecurePort = new CommonClientConfigKey<Integer>("SecurePort"){};
 10     
 11     public static final IClientConfigKey<String> VipAddress = new CommonClientConfigKey<String>("VipAddress"){};
 12     
 13     public static final IClientConfigKey<Boolean> ForceClientPortConfiguration = new CommonClientConfigKey<Boolean>("ForceClientPortConfiguration"){}; // use client defined port regardless of server advert
 14 
 15     public static final IClientConfigKey<String> DeploymentContextBasedVipAddresses = new CommonClientConfigKey<String>("DeploymentContextBasedVipAddresses"){};
 16     
 17     public static final IClientConfigKey<Integer> MaxAutoRetries = new CommonClientConfigKey<Integer>("MaxAutoRetries"){};
 18     
 19     public static final IClientConfigKey<Integer> MaxAutoRetriesNextServer = new CommonClientConfigKey<Integer>("MaxAutoRetriesNextServer"){};
 20     
 21     public static final IClientConfigKey<Boolean> OkToRetryOnAllOperations = new CommonClientConfigKey<Boolean>("OkToRetryOnAllOperations"){};
 22     
 23     public static final IClientConfigKey<Boolean> RequestSpecificRetryOn = new CommonClientConfigKey<Boolean>("RequestSpecificRetryOn"){};
 24     
 25     public static final IClientConfigKey<Integer> ReceiveBufferSize = new CommonClientConfigKey<Integer>("ReceiveBufferSize"){};
 26     
 27     public static final IClientConfigKey<Boolean> EnablePrimeConnections = new CommonClientConfigKey<Boolean>("EnablePrimeConnections"){};
 28     
 29     public static final IClientConfigKey<String> PrimeConnectionsClassName = new CommonClientConfigKey<String>("PrimeConnectionsClassName"){};
 30     
 31     public static final IClientConfigKey<Integer> MaxRetriesPerServerPrimeConnection = new CommonClientConfigKey<Integer>("MaxRetriesPerServerPrimeConnection"){};
 32     
 33     public static final IClientConfigKey<Integer> MaxTotalTimeToPrimeConnections = new CommonClientConfigKey<Integer>("MaxTotalTimeToPrimeConnections"){};
 34     
 35     public static final IClientConfigKey<Float> MinPrimeConnectionsRatio = new CommonClientConfigKey<Float>("MinPrimeConnectionsRatio"){};
 36     
 37     public static final IClientConfigKey<String> PrimeConnectionsURI = new CommonClientConfigKey<String>("PrimeConnectionsURI"){};
 38     
 39     public static final IClientConfigKey<Integer> PoolMaxThreads = new CommonClientConfigKey<Integer>("PoolMaxThreads"){};
 40     
 41     public static final IClientConfigKey<Integer> PoolMinThreads = new CommonClientConfigKey<Integer>("PoolMinThreads"){};
 42     
 43     public static final IClientConfigKey<Integer> PoolKeepAliveTime = new CommonClientConfigKey<Integer>("PoolKeepAliveTime"){};
 44     
 45     public static final IClientConfigKey<String> PoolKeepAliveTimeUnits = new CommonClientConfigKey<String>("PoolKeepAliveTimeUnits"){};
 46 
 47     public static final IClientConfigKey<Boolean> EnableConnectionPool = new CommonClientConfigKey<Boolean>("EnableConnectionPool") {};
 48     
 49     /**
 50      * Use {@link #MaxConnectionsPerHost}
 51      */
 52     @Deprecated    
 53     public static final IClientConfigKey<Integer> MaxHttpConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxHttpConnectionsPerHost"){};
 54     
 55     /**
 56      * Use {@link #MaxTotalConnections}
 57      */
 58     @Deprecated
 59     public static final IClientConfigKey<Integer> MaxTotalHttpConnections = new CommonClientConfigKey<Integer>("MaxTotalHttpConnections"){};
 60     
 61     public static final IClientConfigKey<Integer> MaxConnectionsPerHost = new CommonClientConfigKey<Integer>("MaxConnectionsPerHost"){};
 62     
 63     public static final IClientConfigKey<Integer> MaxTotalConnections = new CommonClientConfigKey<Integer>("MaxTotalConnections"){};
 64     
 65     public static final IClientConfigKey<Boolean> IsSecure = new CommonClientConfigKey<Boolean>("IsSecure"){};
 66     
 67     public static final IClientConfigKey<Boolean> GZipPayload = new CommonClientConfigKey<Boolean>("GZipPayload"){};
 68     
 69     public static final IClientConfigKey<Integer> ConnectTimeout = new CommonClientConfigKey<Integer>("ConnectTimeout"){};
 70     
 71     public static final IClientConfigKey<Integer> BackoffInterval = new CommonClientConfigKey<Integer>("BackoffTimeout"){};
 72     
 73     public static final IClientConfigKey<Integer> ReadTimeout = new CommonClientConfigKey<Integer>("ReadTimeout"){};
 74     
 75     public static final IClientConfigKey<Integer> SendBufferSize = new CommonClientConfigKey<Integer>("SendBufferSize"){};
 76     
 77     public static final IClientConfigKey<Boolean> StaleCheckingEnabled = new CommonClientConfigKey<Boolean>("StaleCheckingEnabled"){};
 78     
 79     public static final IClientConfigKey<Integer> Linger = new CommonClientConfigKey<Integer>("Linger"){};
 80     
 81     public static final IClientConfigKey<Integer> ConnectionManagerTimeout = new CommonClientConfigKey<Integer>("ConnectionManagerTimeout"){};
 82     
 83     public static final IClientConfigKey<Boolean> FollowRedirects = new CommonClientConfigKey<Boolean>("FollowRedirects"){};
 84     
 85     public static final IClientConfigKey<Boolean> ConnectionPoolCleanerTaskEnabled = new CommonClientConfigKey<Boolean>("ConnectionPoolCleanerTaskEnabled"){};
 86     
 87     public static final IClientConfigKey<Integer> ConnIdleEvictTimeMilliSeconds = new CommonClientConfigKey<Integer>("ConnIdleEvictTimeMilliSeconds"){};
 88     
 89     public static final IClientConfigKey<Integer> ConnectionCleanerRepeatInterval = new CommonClientConfigKey<Integer>("ConnectionCleanerRepeatInterval"){};
 90     
 91     public static final IClientConfigKey<Boolean> EnableGZIPContentEncodingFilter = new CommonClientConfigKey<Boolean>("EnableGZIPContentEncodingFilter"){};
 92     
 93     public static final IClientConfigKey<String> ProxyHost = new CommonClientConfigKey<String>("ProxyHost"){};
 94     
 95     public static final IClientConfigKey<Integer> ProxyPort = new CommonClientConfigKey<Integer>("ProxyPort"){};
 96     
 97     public static final IClientConfigKey<String> KeyStore = new CommonClientConfigKey<String>("KeyStore"){};
 98     
 99     public static final IClientConfigKey<String> KeyStorePassword = new CommonClientConfigKey<String>("KeyStorePassword"){};
100     
101     public static final IClientConfigKey<String> TrustStore = new CommonClientConfigKey<String>("TrustStore"){};
102     
103     public static final IClientConfigKey<String> TrustStorePassword = new CommonClientConfigKey<String>("TrustStorePassword"){};
104     
105     // if this is a secure rest client, must we use client auth too?    
106     public static final IClientConfigKey<Boolean> IsClientAuthRequired = new CommonClientConfigKey<Boolean>("IsClientAuthRequired"){};
107     
108     public static final IClientConfigKey<String> CustomSSLSocketFactoryClassName = new CommonClientConfigKey<String>("CustomSSLSocketFactoryClassName"){};
109      // must host name match name in certificate?
110     public static final IClientConfigKey<Boolean> IsHostnameValidationRequired = new CommonClientConfigKey<Boolean>("IsHostnameValidationRequired"){}; 
111 
112     // see also http://hc.apache.org/httpcomponents-client-ga/tutorial/html/advanced.html
113     public static final IClientConfigKey<Boolean> IgnoreUserTokenInConnectionPoolForSecureClient = new CommonClientConfigKey<Boolean>("IgnoreUserTokenInConnectionPoolForSecureClient"){}; 
114     
115     // Client implementation
116     public static final IClientConfigKey<String> ClientClassName = new CommonClientConfigKey<String>("ClientClassName"){};
117 
118     //LoadBalancer Related
119     public static final IClientConfigKey<Boolean> InitializeNFLoadBalancer = new CommonClientConfigKey<Boolean>("InitializeNFLoadBalancer"){};
120     
121     public static final IClientConfigKey<String> NFLoadBalancerClassName = new CommonClientConfigKey<String>("NFLoadBalancerClassName"){};
122     
123     public static final IClientConfigKey<String> NFLoadBalancerRuleClassName = new CommonClientConfigKey<String>("NFLoadBalancerRuleClassName"){};
124     
125     public static final IClientConfigKey<String> NFLoadBalancerPingClassName = new CommonClientConfigKey<String>("NFLoadBalancerPingClassName"){};
126     
127     public static final IClientConfigKey<Integer> NFLoadBalancerPingInterval = new CommonClientConfigKey<Integer>("NFLoadBalancerPingInterval"){};
128     
129     public static final IClientConfigKey<Integer> NFLoadBalancerMaxTotalPingTime = new CommonClientConfigKey<Integer>("NFLoadBalancerMaxTotalPingTime"){};
130 
131     public static final IClientConfigKey<String> NFLoadBalancerStatsClassName = new CommonClientConfigKey<String>("NFLoadBalancerStatsClassName"){};
132     
133     public static final IClientConfigKey<String> NIWSServerListClassName = new CommonClientConfigKey<String>("NIWSServerListClassName"){};
134 
135     public static final IClientConfigKey<String> ServerListUpdaterClassName = new CommonClientConfigKey<String>("ServerListUpdaterClassName"){};
136     
137     public static final IClientConfigKey<String> NIWSServerListFilterClassName = new CommonClientConfigKey<String>("NIWSServerListFilterClassName"){};
138     
139     public static final IClientConfigKey<Integer> ServerListRefreshInterval = new CommonClientConfigKey<Integer>("ServerListRefreshInterval"){};
140     
141     public static final IClientConfigKey<Boolean> EnableMarkingServerDownOnReachingFailureLimit = new CommonClientConfigKey<Boolean>("EnableMarkingServerDownOnReachingFailureLimit"){};
142     
143     public static final IClientConfigKey<Integer> ServerDownFailureLimit = new CommonClientConfigKey<Integer>("ServerDownFailureLimit"){};
144     
145     public static final IClientConfigKey<Integer> ServerDownStatWindowInMillis = new CommonClientConfigKey<Integer>("ServerDownStatWindowInMillis"){};
146     
147     public static final IClientConfigKey<Boolean> EnableZoneAffinity = new CommonClientConfigKey<Boolean>("EnableZoneAffinity"){};
148     
149     public static final IClientConfigKey<Boolean> EnableZoneExclusivity = new CommonClientConfigKey<Boolean>("EnableZoneExclusivity"){};
150     
151     public static final IClientConfigKey<Boolean> PrioritizeVipAddressBasedServers = new CommonClientConfigKey<Boolean>("PrioritizeVipAddressBasedServers"){};
152     
153     public static final IClientConfigKey<String> VipAddressResolverClassName = new CommonClientConfigKey<String>("VipAddressResolverClassName"){};
154     
155     public static final IClientConfigKey<String> TargetRegion = new CommonClientConfigKey<String>("TargetRegion"){};
156     
157     public static final IClientConfigKey<String> RulePredicateClasses = new CommonClientConfigKey<String>("RulePredicateClasses"){};
158     
159     public static final IClientConfigKey<String> RequestIdHeaderName = new CommonClientConfigKey<String>("RequestIdHeaderName") {};
160     
161     public static final IClientConfigKey<Boolean> UseIPAddrForServer = new CommonClientConfigKey<Boolean>("UseIPAddrForServer") {};
162     
163     public static final IClientConfigKey<String> ListOfServers = new CommonClientConfigKey<String>("listOfServers") {};
164 
165     private static final Set<IClientConfigKey> keys = new HashSet<IClientConfigKey>();
166         
167     // ...
168 }
View Code

進入到 DefaultClientConfigImpl,可以看到 CommonClientConfigKey 中的每個配置都對應了一個默認值,在加載配置的時候,如果用戶沒有定制配置,就會使用默認的配置,

  1 public class DefaultClientConfigImpl implements IClientConfig {
  2 
  3     public static final Boolean DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS = Boolean.TRUE;
  4 
  5     public static final String DEFAULT_NFLOADBALANCER_PING_CLASSNAME = "com.netflix.loadbalancer.DummyPing"; // DummyPing.class.getName();
  6 
  7     public static final String DEFAULT_NFLOADBALANCER_RULE_CLASSNAME = "com.netflix.loadbalancer.AvailabilityFilteringRule";
  8 
  9     public static final String DEFAULT_NFLOADBALANCER_CLASSNAME = "com.netflix.loadbalancer.ZoneAwareLoadBalancer";
 10     
 11     public static final boolean DEFAULT_USEIPADDRESS_FOR_SERVER = Boolean.FALSE;
 12 
 13     public static final String DEFAULT_CLIENT_CLASSNAME = "com.netflix.niws.client.http.RestClient";
 14 
 15     public static final String DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME = "com.netflix.client.SimpleVipAddressResolver";
 16 
 17     public static final String DEFAULT_PRIME_CONNECTIONS_URI = "/";
 18 
 19     public static final int DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS = 30000;
 20 
 21     public static final int DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION = 9;
 22 
 23     public static final Boolean DEFAULT_ENABLE_PRIME_CONNECTIONS = Boolean.FALSE;
 24 
 25     public static final int DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW = Integer.MAX_VALUE;
 26 
 27     public static final int DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS = 60000;
 28 
 29     public static final Boolean DEFAULT_ENABLE_REQUEST_THROTTLING = Boolean.FALSE;
 30 
 31     public static final Boolean DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER = Boolean.FALSE;
 32 
 33     public static final Boolean DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED = Boolean.TRUE;
 34 
 35     public static final Boolean DEFAULT_FOLLOW_REDIRECTS = Boolean.FALSE;
 36 
 37     public static final float DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED = 0.0f;
 38 
 39     public static final int DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER = 1;
 40 
 41     public static final int DEFAULT_MAX_AUTO_RETRIES = 0;
 42 
 43     public static final int DEFAULT_BACKOFF_INTERVAL = 0;
 44     
 45     public static final int DEFAULT_READ_TIMEOUT = 5000;
 46 
 47     public static final int DEFAULT_CONNECTION_MANAGER_TIMEOUT = 2000;
 48 
 49     public static final int DEFAULT_CONNECT_TIMEOUT = 2000;
 50 
 51     public static final Boolean DEFAULT_ENABLE_CONNECTION_POOL = Boolean.TRUE;
 52     
 53     @Deprecated
 54     public static final int DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST = 50;
 55 
 56     @Deprecated
 57     public static final int DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS = 200;
 58 
 59     public static final int DEFAULT_MAX_CONNECTIONS_PER_HOST = 50;
 60 
 61     public static final int DEFAULT_MAX_TOTAL_CONNECTIONS = 200;
 62 
 63     public static final float DEFAULT_MIN_PRIME_CONNECTIONS_RATIO = 1.0f;
 64 
 65     public static final String DEFAULT_PRIME_CONNECTIONS_CLASS = "com.netflix.niws.client.http.HttpPrimeConnection";
 66 
 67     public static final String DEFAULT_SEVER_LIST_CLASS = "com.netflix.loadbalancer.ConfigurationBasedServerList";
 68 
 69     public static final String DEFAULT_SERVER_LIST_UPDATER_CLASS = "com.netflix.loadbalancer.PollingServerListUpdater";
 70 
 71     public static final int DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS = 30000; // every half minute (30 secs)
 72 
 73     public static final int DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS = 30000; // all connections idle for 30 secs
 74     
 75     protected volatile Map<String, Object> properties = new ConcurrentHashMap<String, Object>();
 76     
 77     protected Map<IClientConfigKey<?>, Object> typedProperties = new ConcurrentHashMap<IClientConfigKey<?>, Object>();
 78 
 79     private static final Logger LOG = LoggerFactory.getLogger(DefaultClientConfigImpl.class);
 80 
 81     private String clientName = null;
 82 
 83     private VipAddressResolver resolver = null;
 84 
 85     private boolean enableDynamicProperties = true;
 86     /**
 87      * Defaults for the parameters for the thread pool used by batchParallel
 88      * calls
 89      */
 90     public static final int DEFAULT_POOL_MAX_THREADS = DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS;
 91     public static final int DEFAULT_POOL_MIN_THREADS = 1;
 92     public static final long DEFAULT_POOL_KEEP_ALIVE_TIME = 15 * 60L;
 93     public static final TimeUnit DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS = TimeUnit.SECONDS;
 94     public static final Boolean DEFAULT_ENABLE_ZONE_AFFINITY = Boolean.FALSE;
 95     public static final Boolean DEFAULT_ENABLE_ZONE_EXCLUSIVITY = Boolean.FALSE;
 96     public static final int DEFAULT_PORT = 7001;
 97     public static final Boolean DEFAULT_ENABLE_LOADBALANCER = Boolean.TRUE;
 98 
 99     public static final String DEFAULT_PROPERTY_NAME_SPACE = "ribbon";
100 
101     private String propertyNameSpace = DEFAULT_PROPERTY_NAME_SPACE;
102 
103     public static final Boolean DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS = Boolean.FALSE;
104 
105     public static final Boolean DEFAULT_ENABLE_NIWS_EVENT_LOGGING = Boolean.TRUE;
106 
107     public static final Boolean DEFAULT_IS_CLIENT_AUTH_REQUIRED = Boolean.FALSE;
108 
109     private final Map<String, DynamicStringProperty> dynamicProperties = new ConcurrentHashMap<String, DynamicStringProperty>();
110 
111     public Boolean getDefaultPrioritizeVipAddressBasedServers() {
112         return DEFAULT_PRIORITIZE_VIP_ADDRESS_BASED_SERVERS;
113     }
114 
115     public String getDefaultNfloadbalancerPingClassname() {
116         return DEFAULT_NFLOADBALANCER_PING_CLASSNAME;
117     }
118 
119     public String getDefaultNfloadbalancerRuleClassname() {
120         return DEFAULT_NFLOADBALANCER_RULE_CLASSNAME;
121     }
122 
123     public String getDefaultNfloadbalancerClassname() {
124         return DEFAULT_NFLOADBALANCER_CLASSNAME;
125     }
126     
127     public boolean getDefaultUseIpAddressForServer() {
128         return DEFAULT_USEIPADDRESS_FOR_SERVER;
129     }
130 
131     public String getDefaultClientClassname() {
132         return DEFAULT_CLIENT_CLASSNAME;
133     }
134 
135     public String getDefaultVipaddressResolverClassname() {
136         return DEFAULT_VIPADDRESS_RESOLVER_CLASSNAME;
137     }
138 
139     public String getDefaultPrimeConnectionsUri() {
140         return DEFAULT_PRIME_CONNECTIONS_URI;
141     }
142 
143     public int getDefaultMaxTotalTimeToPrimeConnections() {
144         return DEFAULT_MAX_TOTAL_TIME_TO_PRIME_CONNECTIONS;
145     }
146 
147     public int getDefaultMaxRetriesPerServerPrimeConnection() {
148         return DEFAULT_MAX_RETRIES_PER_SERVER_PRIME_CONNECTION;
149     }
150 
151     public Boolean getDefaultEnablePrimeConnections() {
152         return DEFAULT_ENABLE_PRIME_CONNECTIONS;
153     }
154 
155     public int getDefaultMaxRequestsAllowedPerWindow() {
156         return DEFAULT_MAX_REQUESTS_ALLOWED_PER_WINDOW;
157     }
158 
159     public int getDefaultRequestThrottlingWindowInMillis() {
160         return DEFAULT_REQUEST_THROTTLING_WINDOW_IN_MILLIS;
161     }
162 
163     public Boolean getDefaultEnableRequestThrottling() {
164         return DEFAULT_ENABLE_REQUEST_THROTTLING;
165     }
166 
167     public Boolean getDefaultEnableGzipContentEncodingFilter() {
168         return DEFAULT_ENABLE_GZIP_CONTENT_ENCODING_FILTER;
169     }
170 
171     public Boolean getDefaultConnectionPoolCleanerTaskEnabled() {
172         return DEFAULT_CONNECTION_POOL_CLEANER_TASK_ENABLED;
173     }
174 
175     public Boolean getDefaultFollowRedirects() {
176         return DEFAULT_FOLLOW_REDIRECTS;
177     }
178 
179     public float getDefaultPercentageNiwsEventLogged() {
180         return DEFAULT_PERCENTAGE_NIWS_EVENT_LOGGED;
181     }
182 
183     public int getDefaultMaxAutoRetriesNextServer() {
184         return DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER;
185     }
186 
187     public int getDefaultMaxAutoRetries() {
188         return DEFAULT_MAX_AUTO_RETRIES;
189     }
190 
191     public int getDefaultReadTimeout() {
192         return DEFAULT_READ_TIMEOUT;
193     }
194 
195     public int getDefaultConnectionManagerTimeout() {
196         return DEFAULT_CONNECTION_MANAGER_TIMEOUT;
197     }
198 
199     public int getDefaultConnectTimeout() {
200         return DEFAULT_CONNECT_TIMEOUT;
201     }
202 
203     @Deprecated
204     public int getDefaultMaxHttpConnectionsPerHost() {
205         return DEFAULT_MAX_HTTP_CONNECTIONS_PER_HOST;
206     }
207 
208     @Deprecated
209     public int getDefaultMaxTotalHttpConnections() {
210         return DEFAULT_MAX_TOTAL_HTTP_CONNECTIONS;
211     }
212 
213     public int getDefaultMaxConnectionsPerHost() {
214         return DEFAULT_MAX_CONNECTIONS_PER_HOST;
215     }
216 
217     public int getDefaultMaxTotalConnections() {
218         return DEFAULT_MAX_TOTAL_CONNECTIONS;
219     }
220     
221     public float getDefaultMinPrimeConnectionsRatio() {
222         return DEFAULT_MIN_PRIME_CONNECTIONS_RATIO;
223     }
224 
225     public String getDefaultPrimeConnectionsClass() {
226         return DEFAULT_PRIME_CONNECTIONS_CLASS;
227     }
228 
229     public String getDefaultSeverListClass() {
230         return DEFAULT_SEVER_LIST_CLASS;
231     }
232 
233     public int getDefaultConnectionIdleTimertaskRepeatInMsecs() {
234         return DEFAULT_CONNECTION_IDLE_TIMERTASK_REPEAT_IN_MSECS;
235     }
236 
237     public int getDefaultConnectionidleTimeInMsecs() {
238         return DEFAULT_CONNECTIONIDLE_TIME_IN_MSECS;
239     }
240     
241     public VipAddressResolver getResolver() {
242         return resolver;
243     }
244 
245     public boolean isEnableDynamicProperties() {
246         return enableDynamicProperties;
247     }
248 
249     public int getDefaultPoolMaxThreads() {
250         return DEFAULT_POOL_MAX_THREADS;
251     }
252 
253     public int getDefaultPoolMinThreads() {
254         return DEFAULT_POOL_MIN_THREADS;
255     }
256 
257     public long getDefaultPoolKeepAliveTime() {
258         return DEFAULT_POOL_KEEP_ALIVE_TIME;
259     }
260 
261     public TimeUnit getDefaultPoolKeepAliveTimeUnits() {
262         return DEFAULT_POOL_KEEP_ALIVE_TIME_UNITS;
263     }
264 
265     public Boolean getDefaultEnableZoneAffinity() {
266         return DEFAULT_ENABLE_ZONE_AFFINITY;
267     }
268 
269     public Boolean getDefaultEnableZoneExclusivity() {
270         return DEFAULT_ENABLE_ZONE_EXCLUSIVITY;
271     }
272 
273     public int getDefaultPort() {
274         return DEFAULT_PORT;
275     }
276 
277     public Boolean getDefaultEnableLoadbalancer() {
278         return DEFAULT_ENABLE_LOADBALANCER;
279     }
280 
281 
282     public Boolean getDefaultOkToRetryOnAllOperations() {
283         return DEFAULT_OK_TO_RETRY_ON_ALL_OPERATIONS;
284     }
285 
286     public Boolean getDefaultIsClientAuthRequired(){
287         return DEFAULT_IS_CLIENT_AUTH_REQUIRED;
288     }
289 
290 
291     /**
292      * Create instance with no properties in default name space {@link #DEFAULT_PROPERTY_NAME_SPACE}
293      */
294     public DefaultClientConfigImpl() {
295         this.dynamicProperties.clear();
296         this.enableDynamicProperties = false;
297     }
298 
299     /**
300      * Create instance with no properties in the specified name space
301      */
302     public DefaultClientConfigImpl(String nameSpace) {
303         this();
304         this.propertyNameSpace = nameSpace;
305     }
306 
307     public void loadDefaultValues() {
308         putDefaultIntegerProperty(CommonClientConfigKey.MaxHttpConnectionsPerHost, getDefaultMaxHttpConnectionsPerHost());
309         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalHttpConnections, getDefaultMaxTotalHttpConnections());
310         putDefaultBooleanProperty(CommonClientConfigKey.EnableConnectionPool, getDefaultEnableConnectionPool());
311         putDefaultIntegerProperty(CommonClientConfigKey.MaxConnectionsPerHost, getDefaultMaxConnectionsPerHost());
312         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalConnections, getDefaultMaxTotalConnections());
313         putDefaultIntegerProperty(CommonClientConfigKey.ConnectTimeout, getDefaultConnectTimeout());
314         putDefaultIntegerProperty(CommonClientConfigKey.ConnectionManagerTimeout, getDefaultConnectionManagerTimeout());
315         putDefaultIntegerProperty(CommonClientConfigKey.ReadTimeout, getDefaultReadTimeout());
316         putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetries, getDefaultMaxAutoRetries());
317         putDefaultIntegerProperty(CommonClientConfigKey.MaxAutoRetriesNextServer, getDefaultMaxAutoRetriesNextServer());
318         putDefaultBooleanProperty(CommonClientConfigKey.OkToRetryOnAllOperations, getDefaultOkToRetryOnAllOperations());
319         putDefaultBooleanProperty(CommonClientConfigKey.FollowRedirects, getDefaultFollowRedirects());
320         putDefaultBooleanProperty(CommonClientConfigKey.ConnectionPoolCleanerTaskEnabled, getDefaultConnectionPoolCleanerTaskEnabled());
321         putDefaultIntegerProperty(CommonClientConfigKey.ConnIdleEvictTimeMilliSeconds, getDefaultConnectionidleTimeInMsecs());
322         putDefaultIntegerProperty(CommonClientConfigKey.ConnectionCleanerRepeatInterval, getDefaultConnectionIdleTimertaskRepeatInMsecs());
323         putDefaultBooleanProperty(CommonClientConfigKey.EnableGZIPContentEncodingFilter, getDefaultEnableGzipContentEncodingFilter());
324         String proxyHost = ConfigurationManager.getConfigInstance().getString(getDefaultPropName(CommonClientConfigKey.ProxyHost.key()));
325         if (proxyHost != null && proxyHost.length() > 0) {
326             setProperty(CommonClientConfigKey.ProxyHost, proxyHost);
327         }
328         Integer proxyPort = ConfigurationManager
329                 .getConfigInstance()
330                 .getInteger(
331                         getDefaultPropName(CommonClientConfigKey.ProxyPort),
332                         (Integer.MIN_VALUE + 1)); // + 1 just to avoid potential clash with user setting
333         if (proxyPort != (Integer.MIN_VALUE + 1)) {
334             setProperty(CommonClientConfigKey.ProxyPort, proxyPort);
335         }
336         putDefaultIntegerProperty(CommonClientConfigKey.Port, getDefaultPort());
337         putDefaultBooleanProperty(CommonClientConfigKey.EnablePrimeConnections, getDefaultEnablePrimeConnections());
338         putDefaultIntegerProperty(CommonClientConfigKey.MaxRetriesPerServerPrimeConnection, getDefaultMaxRetriesPerServerPrimeConnection());
339         putDefaultIntegerProperty(CommonClientConfigKey.MaxTotalTimeToPrimeConnections, getDefaultMaxTotalTimeToPrimeConnections());
340         putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsURI, getDefaultPrimeConnectionsUri());
341         putDefaultIntegerProperty(CommonClientConfigKey.PoolMinThreads, getDefaultPoolMinThreads());
342         putDefaultIntegerProperty(CommonClientConfigKey.PoolMaxThreads, getDefaultPoolMaxThreads());
343         putDefaultLongProperty(CommonClientConfigKey.PoolKeepAliveTime, getDefaultPoolKeepAliveTime());
344         putDefaultTimeUnitProperty(CommonClientConfigKey.PoolKeepAliveTimeUnits, getDefaultPoolKeepAliveTimeUnits());
345         putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneAffinity, getDefaultEnableZoneAffinity());
346         putDefaultBooleanProperty(CommonClientConfigKey.EnableZoneExclusivity, getDefaultEnableZoneExclusivity());
347         putDefaultStringProperty(CommonClientConfigKey.ClientClassName, getDefaultClientClassname());
348         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerClassName, getDefaultNfloadbalancerClassname());
349         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerRuleClassName, getDefaultNfloadbalancerRuleClassname());
350         putDefaultStringProperty(CommonClientConfigKey.NFLoadBalancerPingClassName, getDefaultNfloadbalancerPingClassname());
351         putDefaultBooleanProperty(CommonClientConfigKey.PrioritizeVipAddressBasedServers, getDefaultPrioritizeVipAddressBasedServers());
352         putDefaultFloatProperty(CommonClientConfigKey.MinPrimeConnectionsRatio, getDefaultMinPrimeConnectionsRatio());
353         putDefaultStringProperty(CommonClientConfigKey.PrimeConnectionsClassName, getDefaultPrimeConnectionsClass());
354         putDefaultStringProperty(CommonClientConfigKey.NIWSServerListClassName, getDefaultSeverListClass());
355         putDefaultStringProperty(CommonClientConfigKey.VipAddressResolverClassName, getDefaultVipaddressResolverClassname());
356         putDefaultBooleanProperty(CommonClientConfigKey.IsClientAuthRequired, getDefaultIsClientAuthRequired());
357         // putDefaultStringProperty(CommonClientConfigKey.RequestIdHeaderName, getDefaultRequestIdHeaderName());
358         putDefaultBooleanProperty(CommonClientConfigKey.UseIPAddrForServer, getDefaultUseIpAddressForServer());
359         putDefaultStringProperty(CommonClientConfigKey.ListOfServers, "");
360     }
361 }
View Code

也可以在組態檔中定制配置,例如配置超時和重試:

 1 # 全域配置
 2 ribbon:
 3   # 客戶端讀取超時時間
 4   ReadTimeout: 3000
 5   # 客戶端連接超時時間
 6   ConnectTimeout: 3000
 7   # 默認只重試 GET,設定為 true 時將重試所有型別,如 POST、PUT、DELETE
 8   OkToRetryOnAllOperations: false
 9   # 重試次數
10   MaxAutoRetries: 1
11   # 最多重試幾個實體
12   MaxAutoRetriesNextServer: 1
13 
14 # 只針對 demo-producer 客戶端
15 demo-producer:
16   ribbon:
17     # 客戶端讀取超時時間
18     ReadTimeout: 5000
19     # 客戶端連接超時時間
20     ConnectTimeout: 3000

2、均衡策略 — IRule

IRule 是最終選擇 Server 的策略規則類,核心的介面就是 choose,

 1 public interface IRule{
 2     
 3     // 選擇 Server
 4     public Server choose(Object key);
 5     
 6     // 設定 ILoadBalancer
 7     public void setLoadBalancer(ILoadBalancer lb);
 8     
 9     // 獲取 ILoadBalancer
10     public ILoadBalancer getLoadBalancer();    
11 }

Ribbon 提供了豐富的負載均衡策略,我們也可以通過配置指定使用某個均衡策略,下面是整個Ribbon提供的 IRule 均衡策略,

3、服務檢查 — IPing

IPing 是用于定期檢查 Server 的可用性的,它只提供了一個介面,用來判斷 Server 是否存活:

1 public interface IPing {
2     
3     public boolean isAlive(Server server);
4 }

IPing 也提供了多種策略可選,下面是整個 IPing 體系結構:

4、獲取服務串列 — ServerList

ServerList 提供了兩個介面,一個是第一次獲取 Server 串列,一個是更新 Server 串列,其中 getUpdatedListOfServers 會每被 Loadbalancer 隔 30 秒調一次來更新 allServerList,

 1 public interface ServerList<T extends Server> {
 2 
 3     public List<T> getInitialListOfServers();
 4     
 5     /**
 6      * Return updated list of servers. This is called say every 30 secs
 7      * (configurable) by the Loadbalancer's Ping cycle
 8      */
 9     public List<T> getUpdatedListOfServers();   
10 }

ServerList 也提供了多種實作,ServerList 體系結構如下:

5、過濾服務 — ServerListFilter

ServerListFilter 提供了一個介面用來過濾出可用的 Server,

1 public interface ServerListFilter<T extends Server> {
2 
3     public List<T> getFilteredListOfServers(List<T> servers);
4 }

ServerListFilter 體系結構如下:

6、服務串列更新 — ServerListUpdater

ServerListUpdater 有多個介面,最核心的就是 start 開啟定時任務呼叫 updateAction 來更新 allServerList,

 1 public interface ServerListUpdater {
 2 
 3     /**
 4      * an interface for the updateAction that actually executes a server list update
 5      */
 6     public interface UpdateAction {
 7         void doUpdate();
 8     }
 9 
10     /**
11      * start the serverList updater with the given update action
12      * This call should be idempotent.
13      */
14     void start(UpdateAction updateAction);
15 }

默認有兩個實作類:

7、負載均衡器 — ILoadBalancer

ILoadBalancer 是負載均衡選擇服務的核心介面,主要提供了如下的獲取Server串列和根據客戶端名稱選擇Server的介面,

 1 public interface ILoadBalancer {
 2 
 3     // 添加Server
 4     public void addServers(List<Server> newServers);
 5     
 6     // 根據key選擇一個Server
 7     public Server chooseServer(Object key);
 8     
 9     // 獲取存活的Server串列,回傳 upServerList
10     public List<Server> getReachableServers();
11 
12     // 獲取所有Server串列,回傳 allServerList
13     public List<Server> getAllServers();
14 }

ILoadBalancer 的體系結構如下:

 

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/235909.html

標籤:其他

上一篇:探索ArrayList底層實作

下一篇:JDK原始碼之ArrayList-Iterator

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more