Soul網關原始碼閱讀(十四)—— Hystrix插件詳解
文章目錄
- Soul網關原始碼閱讀(十四)—— Hystrix插件詳解
- 概要
- 服務熔斷、降級的場景
- Hystrix原理
- soul-plugin-hystrix實戰
- 總結
概要
上一篇介紹了hystrix插件的使用方法,這一篇我們來詳細介紹一下hystrix的原理及使用方法,并實作一個簡單的demo,
服務熔斷、降級的場景
微服務架構下,客戶端發起一個業務請求,通常情況下會在后端進行多次服務之間的呼叫,
試想,如果后端服務的呼叫順序為A->B->C,如果C宕機了無法回應,那么B的請求執行緒作為呼叫方也會阻塞,最終可能導致服務器執行緒池中的執行緒爆掉,而導致B服務也不可用,直到整個系統崩潰,導致服務雪崩,

為了解決以上問題,我們可以進行服務熔斷和降級處理,熔斷和降級一般都是成對出現的,但是他們又有一些區別,
熔斷是指依賴的外部介面出現故障的情況斷絕和外部介面的關系,
降級是指由于自身不能提供正常服務而采取的迫不得已的處理手段,
打個比方就是A呼叫B,B宕機了不能正常回應,A嘗試了幾次都沒能正常訪問B,于是A決定斷絕與B的互動,這個程序叫熔斷,
但是A的可能也是服務方,它接收客戶端C的呼叫請求,由于A熔斷了B不能提供正常服務,但是它還是得給C一個交代,迫不得已采取一個替代方案,諸如回傳一些報錯資訊給A,是整個呼叫流程不受阻塞,這個程序叫服務降級,
搞清楚我們面臨的問題過后,我們來思考一下對應的解決方案,
- 解決因為服務B不可用,而導致服務A因為執行緒阻塞而被打爆的問題,
- 服務A如何判定服務B不可用,也就是需要一個抽象的熔斷規則,當滿足熔斷條件就關閉與B的呼叫,反之就開打,
- 熔斷后,需要一個代替方案,需要定義熔斷后的降級策略,
以上三個問題的核心在于問題1,如何避免服務A因為執行緒阻塞且增長導致的宕機?
容易想到的辦法就是將A呼叫B的執行緒,從服務器(如tomcat)接管過來,不讓tomcat直接呼叫B,而是先交給我們的熔斷器進行管理和處理,熔斷器有權不進行服務B的呼叫,而采取降級策略,
Hystrix原理
Hystrix是解決以上場景的解決方案,下圖展示了當你用使用 Hystrix 封裝后的客戶端請求一個服務時的流程,其中抽象的概念后面在一一解釋,
流程圖

1. 創建 HystrixCommand 或 HystrixObservableCommand 物件
? 這兩個物件則是我們請求的委托的物件,他們負責發起請求,對于他們的區別,暫時先記住:
- HystrixCommand用在依賴服務回傳單個操作結果的時候,有兩種執行方式
- execute():同步執行,從依賴的服務回傳一個單一的結果物件,或是在發生錯誤的時候拋出例外,
- queue():異步執行,直接回傳一個Future物件,其中包含了服務執行結束時要回傳的單一結果物件,
- HystrixObservableCommand 用在依賴服務回傳多個操作結果的時候,也實作了兩種執行方式
- observe():回傳Obervable物件,他代表了操作的多個結果,他是一個HotObservable
- toObservable():同樣回傳Observable物件,也代表了操作多個結果,但它回傳的是一個Cold Observable,
2. 執行 command
一共有四種方式可以執行 command,其中前兩種方式都只適用于簡單的 HystrixCommand 物件:
- excute() — 以阻塞方式運行,并回傳回傳其包裝物件的回應值,或者拋出例外
- queue() — 回傳一個 Future 物件,你可以選擇在適當時機 get
- observe() —
- toObservable() —
K value = command.execute();
Future fValue = command.queue();
Observable ohValue = command.observe(); //hot observable
Observable ocValue = command.toObservable(); //cold observable
實際上,同步方法 execute() 底層邏輯是呼叫 queue().get(),然后 queue() 實際上是呼叫了 toObservable().toBlocking().toFuture(),也就是說所有 HystrixCommand 的邏輯都是走 Observable 實作
3. 請求是否使用快取
如果開啟了請求快取,并且該回應可以在快取中找到,那就立刻回傳快取的回應值,而不會再走遠程呼叫邏輯
4. 是否開啟熔斷
當執行 command 時,Hystrix 會判斷熔斷是否開啟,如果是開啟狀態則走 (8) 進行 Fallback 降級策略,如果未開啟則走 (5) ,繼續下一步判斷是否可以執行 command
5. 執行緒池\佇列\信號量 是否已滿
如果上述三者已達到閾值,Hystrix 就會直接走 (8) 進行 Fallback 降級策略
6. HystrixObservableCommand.construct() 或 HystrixCommand.run()
執行呼叫邏輯,
7. 判斷斷路器健康狀態
8. 進行降級處理
9. 接收回應
soul-plugin-hystrix實戰
了解了hystrix的原理及使用流程過后我們來分析一下,soul中對hystrix的實作,
我們先看一下其目錄結構:

-
HystrixBuilder
它是一個構造器,用于構造我們創建HystrixCommand或者HystrixObservableCommand是的構造引數,它封裝了我們的熔斷規則,
/** * this is build HystrixObservableCommand.Setter. * * @param hystrixHandle {@linkplain HystrixHandle} * @return {@linkplain HystrixObservableCommand.Setter} */ public static HystrixObservableCommand.Setter build(final HystrixHandle hystrixHandle) { //設定默認值 initHystrixHandleOnRequire(hystrixHandle); //groupKey HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey()); HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey()); HystrixCommandProperties.Setter propertiesSetter = HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout()) .withCircuitBreakerEnabled(true) .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) .withExecutionIsolationSemaphoreMaxConcurrentRequests(hystrixHandle.getMaxConcurrentRequests()) .withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage()) .withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold()) .withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds()); return HystrixObservableCommand.Setter .withGroupKey(groupKey) .andCommandKey(commandKey) .andCommandPropertiesDefaults(propertiesSetter); } /** * this is build HystrixCommand.Setter. * @param hystrixHandle {@linkplain HystrixHandle} * @return {@linkplain HystrixCommand.Setter} */ public static HystrixCommand.Setter buildForHystrixCommand(final HystrixHandle hystrixHandle) { initHystrixHandleOnRequire(hystrixHandle); HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey(hystrixHandle.getGroupKey()); HystrixCommandKey commandKey = HystrixCommandKey.Factory.asKey(hystrixHandle.getCommandKey()); HystrixCommandProperties.Setter propertiesSetter = HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds((int) hystrixHandle.getTimeout()) .withCircuitBreakerEnabled(true) .withCircuitBreakerErrorThresholdPercentage(hystrixHandle.getErrorThresholdPercentage()) .withCircuitBreakerRequestVolumeThreshold(hystrixHandle.getRequestVolumeThreshold()) .withCircuitBreakerSleepWindowInMilliseconds(hystrixHandle.getSleepWindowInMilliseconds()); HystrixThreadPoolConfig hystrixThreadPoolConfig = hystrixHandle.getHystrixThreadPoolConfig(); HystrixThreadPoolProperties.Setter threadPoolPropertiesSetter = HystrixThreadPoolProperties.Setter() .withCoreSize(hystrixThreadPoolConfig.getCoreSize()) .withMaximumSize(hystrixThreadPoolConfig.getMaximumSize()) .withMaxQueueSize(hystrixThreadPoolConfig.getMaxQueueSize()) .withKeepAliveTimeMinutes(hystrixThreadPoolConfig.getKeepAliveTimeMinutes()) .withAllowMaximumSizeToDivergeFromCoreSize(true); return HystrixCommand.Setter .withGroupKey(groupKey) .andCommandKey(commandKey) .andCommandPropertiesDefaults(propertiesSetter) .andThreadPoolPropertiesDefaults(threadPoolPropertiesSetter); } -
Command
執行命令的介面,HystrixCommand和HystrixObservableCommand的擴展類需要實作它,
它實作了一個默認的降級方法doFallback,主要邏輯是將請求跳轉到降級的uri上進行處理,
/** * do fall back when some error occurs on hystrix execute. * @param exchange {@link ServerWebExchange} * @param exception {@link Throwable} * @return {@code Mono<Void>} to indicate when request processing is complete. */ default Mono<Void> doFallback(ServerWebExchange exchange, Throwable exception) { if (Objects.isNull(getCallBackUri())) { Object error; error = generateError(exchange, exception); return WebFluxResultUtils.result(exchange, error); } DispatcherHandler dispatcherHandler = SpringBeanUtils.getInstance().getBean(DispatcherHandler.class); ServerHttpRequest request = exchange.getRequest().mutate().uri(getCallBackUri()).build(); ServerWebExchange mutated = exchange.mutate().request(request).build(); return dispatcherHandler.handle(mutated); } -
HystrixCommand(這里名字剛好取的相反,不要混淆了,)
它是HystrixObservableCommand的擴展類,主要是實作器construct方法
@Override protected Observable<Void> construct() { return RxReactiveStreams.toObservable(chain.execute(exchange)); } -
HystrixCommandOnThread
它是HystrixCommand的擴展類,主要是實作run方法
@Override protected Mono<Void> run() { RxReactiveStreams.toObservable(chain.execute(exchange)).toBlocking().subscribe(); return Mono.empty(); } -
HystrixPlugin
實作doEexcute方法,使用HystrixCommand或HystrixObservableCommand來呼叫請求,
@Slf4j public class HystrixPlugin extends AbstractSoulPlugin { @Override protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) { final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT); assert soulContext != null; //構造從admin配置的規則封裝成HystrixHandle物件 final HystrixHandle hystrixHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), HystrixHandle.class); if (StringUtils.isBlank(hystrixHandle.getGroupKey())) { hystrixHandle.setGroupKey(Objects.requireNonNull(soulContext).getModule()); } if (StringUtils.isBlank(hystrixHandle.getCommandKey())) { hystrixHandle.setCommandKey(Objects.requireNonNull(soulContext).getMethod()); } //根據HystrixIsolationModeEnum型別,選擇構建command物件,信號量——HystrixCommand,執行緒池——HystrixCommandOnThread(見步驟1) Command command = fetchCommand(hystrixHandle, exchange, chain); return Mono.create(s -> { //執行command,execute方法或者toObservable方法(見步驟2) Subscription sub = command.fetchObservable().subscribe(s::success, s::error, s::success); s.onCancel(sub::unsubscribe); //如果熔斷器打開會列印以下日志 if (command.isCircuitBreakerOpen()) { log.error("hystrix execute have circuitBreaker is Open! groupKey:{},commandKey:{}", hystrixHandle.getGroupKey(), hystrixHandle.getCommandKey()); } }).doOnError(throwable -> { //例外處理 log.error("hystrix execute exception:", throwable); exchange.getAttributes().put(Constants.CLIENT_RESPONSE_RESULT_TYPE, ResultEnum.ERROR.getName()); chain.execute(exchange); }).then(); } private Command fetchCommand(final HystrixHandle hystrixHandle, final ServerWebExchange exchange, final SoulPluginChain chain) { if (hystrixHandle.getExecutionIsolationStrategy() == HystrixIsolationModeEnum.SEMAPHORE.getCode()) { return new HystrixCommand(HystrixBuilder.build(hystrixHandle), exchange, chain, hystrixHandle.getCallBackUri()); } return new HystrixCommandOnThread(HystrixBuilder.buildForHystrixCommand(hystrixHandle), exchange, chain, hystrixHandle.getCallBackUri()); } //... }
總結
這一篇粗略的介紹了服務熔斷、降級的基本概念,以及hystrix的實作原理,
其中還有很多細節地方沒有分析到,比如隔離策略執行緒池和信號量的實作原理是什么,他們的區別是什么?還有待繼續深入分析,另外一點是由于hystrix是基于事件流rxjava庫構建的,所以原始碼中使用了大量的鏈式呼叫、異步處理等邏輯,所以需要補充這兩個基礎知識:
- 執行緒隔離策略實作原理,
- rxjava語法規則及原理,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/255671.html
標籤:其他
上一篇:訊息佇列之-RabbitMQ
