主頁 >  其他 > 微服務組件--限流框架Spring Cloud Hystrix分析

微服務組件--限流框架Spring Cloud Hystrix分析

2022-10-22 08:04:43 其他

Hystrix的介紹

【1】Hystrix是springCloud的組件之一,Hystrix 可以讓我們在分布式系統中對服務間的呼叫進行控制加入一些呼叫延遲或者依賴故障的容錯機制,

【2】Hystrix 通過將依賴服務進行資源隔離進而阻止某個依賴服務出現故障時在整個系統所有的依賴服務呼叫中進行蔓延;【防止服務雪崩

【3】其核心功能:

  1)服務隔離(服務限流)

    通過執行緒池或者信號量判斷是否已滿,超出容量的請求直接降級,以達到限流的作用,

  2)服務熔斷

    當失敗率達到閾值自動觸發降級,熔斷器觸發的快速失敗會有助于系統防止崩潰,【可以說熔斷是特定條件的降級

  3)服務降級

    服務降級是當服務器壓力劇增的情況下,根據當前業務情況及流量對一些服務和頁面有策略的降級,以此釋放服務器資源以保證核心任務的正常運行,

 

Hystrix的簡單使用

【1】引入依賴

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
</dependency>

【2】啟動類開啟hystrix功能

@SpringBootApplication
//注冊到eureka
@EnableEurekaClient
//開啟斷路器功能
@EnableCircuitBreaker
public class WebApplication {

【3】注解@HystrixCommand引數分析

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Inherited
@Documented
public @interface HystrixCommand {
    // HystrixCommand 命令所屬的組的名稱:默認注解方法類的名稱
    String groupKey() default "";
    // HystrixCommand 命令的key值,默認值為注解方法的名稱
    String commandKey() default "";
    // 執行緒池名稱,默認定義為groupKey
    String threadPoolKey() default "";
    // 定義回退方法的名稱, 此方法必須和hystrix的執行方法在相同類中
    String fallbackMethod() default "";
    // 配置hystrix命令的引數
    HystrixProperty[] commandProperties() default {};
    // 配置hystrix依賴的執行緒池的引數
    HystrixProperty[] threadPoolProperties() default {};
    // 如果hystrix方法拋出的例外包括RUNTIME_EXCEPTION,則會被封裝HystrixRuntimeException例外,我們也可以通過此方法定義哪些需要忽略的例外
    Class<? extends Throwable>[] ignoreExceptions() default {};
    // 定義執行hystrix observable的命令的模式,型別詳細見ObservableExecutionMode
    ObservableExecutionMode observableExecutionMode() default ObservableExecutionMode.EAGER;
    // 如果hystrix方法拋出的例外包括RUNTIME_EXCEPTION,則會被封裝HystrixRuntimeException例外,此方法定義需要拋出的例外
    HystrixException[] raiseHystrixExceptions() default {};
    // 定義回呼方法:但是defaultFallback不能傳入引數,回傳引數和hystrix的命令兼容
    String defaultFallback() default "";
}

【4】使用示例

//執行緒池隔離的設定,執行緒池隔離與信號量隔離的最大區別在于發送請求的執行緒,信號量是采用呼叫方法的執行緒,而執行緒池則是用池內的執行緒去發送請求
@HystrixCommand(
        groupKey="test-provider",
        threadPoolKey="test-provider",
        threadPoolProperties = {
                @HystrixProperty(name = "coreSize", value = "https://www.cnblogs.com/chafry/p/20"),//執行緒池大小
                @HystrixProperty(name = "maximumSize", value = "https://www.cnblogs.com/chafry/p/30"),//最大執行緒池大小
                @HystrixProperty(name = "maxQueueSize", value = "https://www.cnblogs.com/chafry/p/20"),//最大佇列長度
                @HystrixProperty(name =  "keepAliveTimeMinutes", value = "https://www.cnblogs.com/chafry/p/2")//執行緒存活時間
        },commandProperties = {
        @HystrixProperty(name = "execution.isolation.strategy",value = "https://www.cnblogs.com/chafry/p/THREAD")
}
//信號量隔離的設定
@HystrixCommand(
        //用來設定降級方法
        fallbackMethod = "myTestFallbackMethod",
        commandProperties = {
                //進行熔斷配置
                //條件1,設定在滾動時間視窗中,斷路器的最小請求數(沒有達到不會熔斷),默認20,
                @HystrixProperty(name = "circuitBreaker.requestVolumeThreshold" ,value = "https://www.cnblogs.com/chafry/p/10"),
                //條件2,設定斷路器打開的錯誤百分比,在滾動時間內,在請求數量超過requestVolumeThreshold的值,且錯誤請求數的百分比超過這個比例,斷路器就為打開狀態,
                @HystrixProperty(name = "circuitBreaker.errorThresholdPercentage" ,value = "https://www.cnblogs.com/chafry/p/30"),
                //條件3,設定滾動時間窗的長度,單位毫秒,這個時間視窗就是斷路器收集資訊的持續時間,斷路器在收集指標資訊的時會根據這個時間視窗把這個視窗拆分成多個桶,每個桶代表一段時間的指標,默認10000.
                @HystrixProperty(name = "metrics.rollingStats.timeInMilliseconds" ,value = "https://www.cnblogs.com/chafry/p/10000"),
                //設定當斷路器打開之后的休眠時間,休眠時間結束后斷路器為半開狀態,斷路器能接受請求,如果請求失敗又重新回到打開狀態,如果請求成功又回到關閉狀態
                //單位是毫秒
                @HystrixProperty(name = "circuitBreaker.sleepWindowInMilliseconds" ,value = "https://www.cnblogs.com/chafry/p/3000"),

                //配置信號量隔離
                //配置信號量的數值
                @HystrixProperty(name = "execution.isolation.semaphore.maxConcurrentRequests",value = "https://www.cnblogs.com/chafry/p/100"),
                //選擇策略為信號量隔離
                @HystrixProperty(name = "execution.isolation.strategy", value = "https://www.cnblogs.com/chafry/p/SEMAPHORE"),
                //設定HystrixCommand執行的超時時間,單位毫秒
                @HystrixProperty(name = "execution.isolation.thread.timeoutInMilliseconds", value = "https://www.cnblogs.com/chafry/p/1000000000")
        }
)
public String Test(){
    ....
}

public String myTestFallbackMethod() {
    log.info("========myTestFallbackMethod=========");
    return "myTestFallbackMethod";
}

 

Hystrix原始碼總結

  1.采用了AOP的方式來對方法進行了增強,

  2.采用了大量的RxJava回應式編程,利用了Future+執行緒池的方法進行了大量的異步

  3.涉及到了滑動視窗的設計,來進行統計失敗率

 

Hystrix原始碼分析

【1】分析注解@EnableCircuitBreaker是如何開啟斷路器功能

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableCircuitBreakerImportSelector.class)
public @interface EnableCircuitBreaker {}
//注解說明:注釋以啟用斷路器實作
//但實際上只是匯入了EnableCircuitBreakerImportSelector類

【2】深入分析EnableCircuitBreakerImportSelector類做了什么

//會發現什么都沒做,只是將環境變數中的某個值設定為true
@Order(Ordered.LOWEST_PRECEDENCE - 100)
public class EnableCircuitBreakerImportSelector extends SpringFactoryImportSelector<EnableCircuitBreaker> {
    @Override
    protected boolean isEnabled() {
        return getEnvironment().getProperty("spring.cloud.circuit.breaker.enabled",
                Boolean.class, Boolean.TRUE);
    }
}

【3】分析SpringBoot自動裝配會匯入什么

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.netflix.hystrix.HystrixAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerAutoConfiguration,\
org.springframework.cloud.netflix.hystrix.security.HystrixSecurityAutoConfiguration

org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker=\
org.springframework.cloud.netflix.hystrix.HystrixCircuitBreakerConfiguration   //該類會比較重要

【4】分析HystrixCircuitBreakerConfiguration類做了什么

@Configuration(proxyBeanMethods = false)
public class HystrixCircuitBreakerConfiguration {

    //這個看名字就很重要,初始化AOP的攔截
    @Bean
    public HystrixCommandAspect hystrixCommandAspect() {
        return new HystrixCommandAspect();
    }

    @Bean
    public HystrixShutdownHook hystrixShutdownHook() {
        return new HystrixShutdownHook();
    }

    @Bean
    public HasFeatures hystrixFeature() {
        return HasFeatures.namedFeatures(new NamedFeature("Hystrix", HystrixCommandAspect.class));
    }

    private class HystrixShutdownHook implements DisposableBean {

        @Override
        public void destroy() throws Exception {
            // Just call Hystrix to reset thread pool etc.
            Hystrix.reset();
        }

    }
}

 

【5】分析HystrixCommandAspect類在做了什么

//先是定義了兩個切入點
@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand)")
public void hystrixCommandAnnotationPointcut() {}

@Pointcut("@annotation(com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser)")
public void hystrixCollapserAnnotationPointcut() {}

//定義切面
@Around("hystrixCommandAnnotationPointcut() || hystrixCollapserAnnotationPointcut()")
public Object methodsAnnotatedWithHystrixCommand(final ProceedingJoinPoint joinPoint) throws Throwable {
    //通過切點獲取被攔截的方法
    Method method = getMethodFromTarget(joinPoint);
    Validate.notNull(method, "failed to get method from joinPoint: %s", joinPoint);
    if (method.isAnnotationPresent(HystrixCommand.class) && method.isAnnotationPresent(HystrixCollapser.class)) {
        throw new IllegalStateException(...);
    }
    MetaHolderFactory metaHolderFactory = META_HOLDER_FACTORY_MAP.get(HystrixPointcutType.of(method));
    //metaholder中保存了很多和切點相關的資訊,說白了就是決議注解獲得上面的資訊
    MetaHolder metaHolder = metaHolderFactory.create(joinPoint);
    HystrixInvokable invokable = HystrixCommandFactory.getInstance().create(metaHolder);
    ExecutionType executionType = metaHolder.isCollapserAnnotationPresent() ?
            metaHolder.getCollapserExecutionType() : metaHolder.getExecutionType();

    Object result;
    try {
        if (!metaHolder.isObservable()) {
            result = CommandExecutor.execute(invokable, executionType, metaHolder);
        } else {
            result = executeObservable(invokable, executionType, metaHolder);
        }
    } catch (HystrixBadRequestException e) {...} catch (HystrixRuntimeException e) {...}
    return result;
}

【5.1】模式分析---分析MetaHolder的構成

public MetaHolder create(final ProceedingJoinPoint joinPoint) {
    Method method = getMethodFromTarget(joinPoint);
    Object obj = joinPoint.getTarget();
    Object[] args = joinPoint.getArgs();
    Object proxy = joinPoint.getThis();
    return create(proxy, method, obj, args, joinPoint);
}

private static class CommandMetaHolderFactory extends MetaHolderFactory {
    @Override
    public MetaHolder create(Object proxy, Method method, Object obj, Object[] args, final ProceedingJoinPoint joinPoint) {
        HystrixCommand hystrixCommand = method.getAnnotation(HystrixCommand.class);
        ExecutionType executionType = ExecutionType.getExecutionType(method.getReturnType());
        MetaHolder.Builder builder = metaHolderBuilder(proxy, method, obj, args, joinPoint);
        if (isCompileWeaving()) {
            builder.ajcMethod(getAjcMethodFromTarget(joinPoint));
        }
        return builder.defaultCommandKey(method.getName())
                        .hystrixCommand(hystrixCommand)
                        .observableExecutionMode(hystrixCommand.observableExecutionMode())
                        .executionType(executionType)
                        .observable(ExecutionType.OBSERVABLE == executionType)
                        .build();
    }
}

public enum ExecutionType {

    ASYNCHRONOUS,
    SYNCHRONOUS,
    OBSERVABLE;

    //所以根據我們的基本使用可以判斷是SYNCHRONOUS,同步模式
    public static ExecutionType getExecutionType(Class<?> type) {
        if (Future.class.isAssignableFrom(type)) {
            return ExecutionType.ASYNCHRONOUS;
        } else if (Observable.class.isAssignableFrom(type)) {
            return ExecutionType.OBSERVABLE;
        } else {
            return ExecutionType.SYNCHRONOUS;
        }
    }

}

 

【6】分析HystrixInvokable的創建【層層追溯,其實發現是生成一個包裝過的HystrixCommand

//分析HystrixInvokable的創建
public HystrixInvokable create(MetaHolder metaHolder) {
    HystrixInvokable executable;
  //判斷是不是HystrixCollapser注解
if (metaHolder.isCollapserAnnotationPresent()) { executable = new CommandCollapser(metaHolder); } else if (metaHolder.isObservable()) { executable = new GenericObservableCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } else { //主要是這里 executable = new GenericCommand(HystrixCommandBuilderFactory.getInstance().create(metaHolder)); } return executable; } public GenericCommand(HystrixCommandBuilder builder) { super(builder); } protected AbstractHystrixCommand(HystrixCommandBuilder builder) { super(builder.getSetterBuilder().build()); this.commandActions = builder.getCommandActions(); this.collapsedRequests = builder.getCollapsedRequests(); this.cacheResultInvocationContext = builder.getCacheResultInvocationContext(); this.cacheRemoveInvocationContext = builder.getCacheRemoveInvocationContext(); this.ignoreExceptions = builder.getIgnoreExceptions(); this.executionType = builder.getExecutionType(); } HystrixCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { super(group, key, threadPoolKey, circuitBreaker, threadPool, commandPropertiesDefaults, threadPoolPropertiesDefaults, metrics, fallbackSemaphore, executionSemaphore, propertiesStrategy, executionHook); } protected AbstractCommand(HystrixCommandGroupKey group, HystrixCommandKey key, HystrixThreadPoolKey threadPoolKey, HystrixCircuitBreaker circuitBreaker, HystrixThreadPool threadPool, HystrixCommandProperties.Setter commandPropertiesDefaults, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults, HystrixCommandMetrics metrics, TryableSemaphore fallbackSemaphore, TryableSemaphore executionSemaphore, HystrixPropertiesStrategy propertiesStrategy, HystrixCommandExecutionHook executionHook) { this.commandGroup = initGroupKey(group); this.commandKey = initCommandKey(key, getClass()); this.properties = initCommandProperties(this.commandKey, propertiesStrategy, commandPropertiesDefaults); this.threadPoolKey = initThreadPoolKey(threadPoolKey, this.commandGroup, this.properties.executionIsolationThreadPoolKeyOverride().get()); this.metrics = initMetrics(metrics, this.commandGroup, this.threadPoolKey, this.commandKey, this.properties); this.circuitBreaker = initCircuitBreaker(this.properties.circuitBreakerEnabled().get(), circuitBreaker, this.commandGroup, this.commandKey, this.properties, this.metrics); this.threadPool = initThreadPool(threadPool, this.threadPoolKey, threadPoolPropertiesDefaults); //Strategies from plugins this.eventNotifier = HystrixPlugins.getInstance().getEventNotifier(); this.concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy(); HystrixMetricsPublisherFactory.createOrRetrievePublisherForCommand(this.commandKey, this.commandGroup, this.metrics, this.circuitBreaker, this.properties); this.executionHook = initExecutionHook(executionHook); this.requestCache = HystrixRequestCache.getInstance(this.commandKey, this.concurrencyStrategy); this.currentRequestLog = initRequestLog(this.properties.requestLogEnabled().get(), this.concurrencyStrategy); /* fallback semaphore override if applicable */ this.fallbackSemaphoreOverride = fallbackSemaphore; /* execution semaphore override if applicable */ this.executionSemaphoreOverride = executionSemaphore; }

 

【7】那么接下來分析CommandExecutor.execute做了什么

public static Object execute(HystrixInvokable invokable, ExecutionType executionType, MetaHolder metaHolder) throws RuntimeException {
    Validate.notNull(invokable);
    Validate.notNull(metaHolder);

    switch (executionType) {
        //基于上面構成先分析同步方法
        case SYNCHRONOUS: {
            //呼叫HystrixCommand
            return castToExecutable(invokable, executionType).execute();
        }
        case ASYNCHRONOUS: {
            HystrixExecutable executable = castToExecutable(invokable, executionType);
            if (metaHolder.hasFallbackMethodCommand()
                    && ExecutionType.ASYNCHRONOUS == metaHolder.getFallbackExecutionType()) {
                return new FutureDecorator(executable.queue());
            }
            return executable.queue();
        }
        case OBSERVABLE: {
            HystrixObservable observable = castToObservable(invokable);
            return ObservableExecutionMode.EAGER == metaHolder.getObservableExecutionMode() ? observable.observe() : observable.toObservable();
        }
        default:
            throw new RuntimeException("unsupported execution type: " + executionType);
    }
}

//HystrixCommand類#execute方法
public R execute() {
    try {
        //利用了Future模式
        return queue().get();
    } catch (Exception e) {
        throw Exceptions.sneakyThrow(decomposeException(e));
    }
}

【8】分析queue()方法怎么使用Future模式的

//分析queue()方法怎么使用Future模式的
//熟悉執行緒池的,應該知道執行緒池有個FutureTask的任務
//通過持有FutureTask句柄可以異步獲取回傳結果
//本質上就是FutureTask持有
//一個結果存放地址
//執行緒執行的run方法(執行完后將結果放入固定的存放地址)
//那么現在看下面的邏輯就會十分清晰
public Future<R> queue() {
    
    final Future<R> delegate = toObservable().toBlocking().toFuture();
    
    final Future<R> f = new Future<R>() {

        @Override
        public boolean cancel(boolean mayInterruptIfRunning) {
            if (delegate.isCancelled()) {
                return false;
            }

            if (HystrixCommand.this.getProperties().executionIsolationThreadInterruptOnFutureCancel().get()) {
                interruptOnFutureCancel.compareAndSet(false, mayInterruptIfRunning);
            }

            final boolean res = delegate.cancel(interruptOnFutureCancel.get());

            if (!isExecutionComplete() && interruptOnFutureCancel.get()) {
                final Thread t = executionThread.get();
                if (t != null && !t.equals(Thread.currentThread())) {
                    t.interrupt();
                }
            }

            return res;
        }

        @Override
        public boolean isCancelled() {
            return delegate.isCancelled();
        }

        @Override
        public boolean isDone() {
            return delegate.isDone();
        }

        @Override
        public R get() throws InterruptedException, ExecutionException {
            return delegate.get();
        }

        @Override
        public R get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
            return delegate.get(timeout, unit);
        }
        
    };

    /* special handling of error states that throw immediately */
    if (f.isDone()) {
        try {
            f.get();
            return f;
        } catch (Exception e) {
            Throwable t = decomposeException(e);
            if (t instanceof HystrixBadRequestException) {
                return f;
            } else if (t instanceof HystrixRuntimeException) {
                HystrixRuntimeException hre = (HystrixRuntimeException) t;
                switch (hre.getFailureType()) {
                case COMMAND_EXCEPTION:
                case TIMEOUT:
                    // we don't throw these types from queue() only from queue().get() as they are execution errors
                    return f;
                default:
                    // these are errors we throw from queue() as they as rejection type errors
                    throw hre;
                }
            } else {
                throw Exceptions.sneakyThrow(t);
            }
        }
    }
    //也就是將產生的Future物件回傳
    return f;
}

【9】分析結果的獲取是從delegate屬性中獲取,它被定義為一個觀察者

//定義觀察者
public Observable<R> toObservable() {
    final AbstractCommand<R> _cmd = this;

    //doOnCompleted handler already did all of the SUCCESS work
    //doOnError handler already did all of the FAILURE/TIMEOUT/REJECTION/BAD_REQUEST work
    //第一個觀察者,命令執行結束后的清理者
    final Action0 terminateCommandCleanup = new Action0() {
        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.TERMINAL)) {
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.TERMINAL)) {
                handleCommandEnd(true); //user code did run
            }
        }
    };

    //mark the command as CANCELLED and store the latency (in addition to standard cleanup)
    //第二個觀察者,取消訂閱時處理者
    final Action0 unsubscribeCommandCleanup = new Action0() {
        @Override
        public void call() {
            if (_cmd.commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    try {
                        executionHook.onUnsubscribe(_cmd);
                    } catch (Throwable hookEx) {...}
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(false); //user code never ran
            } else if (_cmd.commandState.compareAndSet(CommandState.USER_CODE_EXECUTED, CommandState.UNSUBSCRIBED)) {
                if (!_cmd.executionResult.containsTerminalEvent()) {
                    _cmd.eventNotifier.markEvent(HystrixEventType.CANCELLED, _cmd.commandKey);
                    try {
                        executionHook.onUnsubscribe(_cmd);
                    } catch (Throwable hookEx) {...}
                    _cmd.executionResultAtTimeOfCancellation = _cmd.executionResult
                            .addEvent((int) (System.currentTimeMillis() - _cmd.commandStartTimestamp), HystrixEventType.CANCELLED);
                }
                handleCommandEnd(true); //user code did run
            }
        }
    };
    //第三個觀察者,重點:Hystrix 核心邏輯: 斷路器、隔離
    final Func0<Observable<R>> applyHystrixSemantics = new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            if (commandState.get().equals(CommandState.UNSUBSCRIBED)) {
                return Observable.never();
            }
            return applyHystrixSemantics(_cmd);
        }
    };
    //第四個觀察者,發射資料(OnNext表示發射資料)時的Hook
    final Func1<R, R> wrapWithAllOnNextHooks = new Func1<R, R>() {
        @Override
        public R call(R r) {
            R afterFirstApplication = r;

            try {
                afterFirstApplication = executionHook.onComplete(_cmd, r);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onComplete", hookEx);
            }

            try {
                return executionHook.onEmit(_cmd, afterFirstApplication);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onEmit", hookEx);
                return afterFirstApplication;
            }
        }
    };
    //第五個觀察者,命令執行完成的Hook
    final Action0 fireOnCompletedHook = new Action0() {
        @Override
        public void call() {
            try {
                executionHook.onSuccess(_cmd);
            } catch (Throwable hookEx) {
                logger.warn("Error calling HystrixCommandExecutionHook.onSuccess", hookEx);
            }
        }
    };
   //進行包裝
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
             /* this is a stateful object so can only be used once */
            if (!commandState.compareAndSet(CommandState.NOT_STARTED, CommandState.OBSERVABLE_CHAIN_CREATED)) {
                ..省略拋出例外..
            }

            commandStartTimestamp = System.currentTimeMillis();

            if (properties.requestLogEnabled().get()) {
                // log this command execution regardless of what happened
                if (currentRequestLog != null) {
                    currentRequestLog.addExecutedCommand(_cmd);
                }
            }

            final boolean requestCacheEnabled = isRequestCachingEnabled();
            final String cacheKey = getCacheKey();

            /* try from cache first */
            if (requestCacheEnabled) {
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.get(cacheKey);
                if (fromCache != null) {
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                }
            }
       // 使用上面的Func0:applyHystrixSemantics 來創建Observable
            Observable<R> hystrixObservable =
                    Observable.defer(applyHystrixSemantics)
                            .map(wrapWithAllOnNextHooks);

            Observable<R> afterCache;

            // 設定快取邏輯,不太重要
            if (requestCacheEnabled && cacheKey != null) {
                // wrap it for caching
                HystrixCachedObservable<R> toCache = HystrixCachedObservable.from(hystrixObservable, _cmd);
                HystrixCommandResponseFromCache<R> fromCache = (HystrixCommandResponseFromCache<R>) requestCache.putIfAbsent(cacheKey, toCache);
                if (fromCache != null) {
                    // another thread beat us so we'll use the cached value instead
                    toCache.unsubscribe();
                    isResponseFromCache = true;
                    return handleRequestCacheHitAndEmitValues(fromCache, _cmd);
                } else {
                    // we just created an ObservableCommand so we cast and return it
                    afterCache = toCache.toObservable();
                }
            } else {
                afterCache = hystrixObservable;
            }
       //鏈式
            return afterCache
                    .doOnTerminate(terminateCommandCleanup)     // perform cleanup once (either on normal terminal state (this line), or unsubscribe (next line))
                    .doOnUnsubscribe(unsubscribeCommandCleanup) // perform cleanup once
                    .doOnCompleted(fireOnCompletedHook);
        }
    });
}

 

【10】分析核心applyHystrixSemantics方法

private Observable<R> applyHystrixSemantics(final AbstractCommand<R> _cmd) {
    //執行命令開始執行的鉤子方法 可能有人會問 前面系結了那么多的鉤子方法 這里怎么才開始
    //start 因為前面系結但是并沒有執行, 當有訂閱者訂閱 這里才是開始執行的代碼邏輯
    executionHook.onStart(_cmd);

    //判斷斷路器是否開啟
    if (circuitBreaker.allowRequest()) {
        //如果是信號量隔離  回傳TryableSemaphoreActual 根據設定的并發量來判斷是否能執行,如果不能執行,進入fallback,
        //如果是執行緒池隔離 回傳TryableSemaphoreNoOp  直接回傳true沒有任何操作
        final TryableSemaphore executionSemaphore = getExecutionSemaphore();
        final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
        final Action0 singleSemaphoreRelease = new Action0() {
            @Override
            public void call() {
                if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                    executionSemaphore.release();
                }
            }
        };

        final Action1<Throwable> markExceptionThrown = new Action1<Throwable>() {
            @Override
            public void call(Throwable t) {
                eventNotifier.markEvent(HystrixEventType.EXCEPTION_THROWN, commandKey);
            }
        };

        //判斷能否正常執行
        if (executionSemaphore.tryAcquire()) {
            try {
                /* used to track userThreadExecutionTime */
                executionResult = executionResult.setInvocationStartTime(System.currentTimeMillis());
                //核心方法
                return executeCommandAndObserve(_cmd)
                        .doOnError(markExceptionThrown)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } catch (RuntimeException e) {
                return Observable.error(e);
            }
        } else {
            //信號量執行的時候并發太大直接回退
            return handleSemaphoreRejectionViaFallback();
        }
    } else {
        //執行降級
        return handleShortCircuitViaFallback();
    }
}

//TryableSemaphoreActual類#tryAcquire方法
@Override
public boolean tryAcquire() {
    int currentCount = count.incrementAndGet();
    if (currentCount > numberOfPermits.get()) {
        count.decrementAndGet();
        return false;
    } else {
        return true;
    }
}

//TryableSemaphoreNoOp類#tryAcquire方法
@Override
public boolean tryAcquire() {
    return true;
}

 

【11】分析allowRequest方法是怎么判斷是否允許通過的

@Override
public boolean allowRequest() {
    if (properties.circuitBreakerForceOpen().get()) {
        // 屬性要求我們強制打開電路,這樣我們將允許NO請求
        return false;
    }
    if (properties.circuitBreakerForceClosed().get()) {
        // 我們仍然希望允許isOpen()執行它的計算,因此我們模擬正常的行為
        isOpen();
        // 屬性要求我們忽略錯誤,所以我們將忽略isOpen的結果,只允許所有的流量通過
        return true;
    }
    return !isOpen() || allowSingleTest();
}

@Override
public boolean isOpen() {
    //如果斷路器打開立即回傳true
    if (circuitOpen.get()) {
        return true;
    }

    // we're closed, so let's see if errors have made us so we should trip the circuit open
    HealthCounts health = metrics.getHealthCounts();

    // check if we are past the statisticalWindowVolumeThreshold
    if (health.getTotalRequests() < properties.circuitBreakerRequestVolumeThreshold().get()) {
        // we are not past the minimum volume threshold for the statisticalWindow so we'll return false immediately and not calculate anything
        return false;
    }

    if (health.getErrorPercentage() < properties.circuitBreakerErrorThresholdPercentage().get()) {
        return false;
    } else {
        // our failure rate is too high, trip the circuit
        if (circuitOpen.compareAndSet(false, true)) {
            // if the previousValue was false then we want to set the currentTime
            circuitOpenedOrLastTestedTime.set(System.currentTimeMillis());
            return true;
        } else {
            // How could previousValue be true? If another thread was going through this code at the same time a race-condition could have
            // caused another thread to set it to true already even though we were in the process of doing the same
            // In this case, we know the circuit is open, so let the other thread set the currentTime and report back that the circuit is open
            return true;
        }
    }
}

public boolean allowSingleTest() {
    long timeCircuitOpenedOrWasLastTested = circuitOpenedOrLastTestedTime.get();
    // 1) 如果斷路器是打開的
    // 2) 且已經過了休眠時間,嘗試打開
    if (circuitOpen.get() && System.currentTimeMillis() > timeCircuitOpenedOrWasLastTested + properties.circuitBreakerSleepWindowInMilliseconds().get()) {
        //已經過了休眠時間,允許一個請求嘗試,
        //如果成功,斷路器被關閉,
        if (circuitOpenedOrLastTestedTime.compareAndSet(timeCircuitOpenedOrWasLastTested, System.currentTimeMillis())) {
            //如果這個回傳true,意味著我們設定了時間,因此我們將回傳true以允許單次嘗試
            //如果它回傳false,這意味著另一個執行緒在我們之前運行并允許單次嘗試
            return true;
        }
    }
    return false;
}

 

【12】分析降級的邏輯

private Observable<R> getFallbackOrThrowException(final AbstractCommand<R> _cmd, final HystrixEventType eventType, final FailureType failureType, final String message, final Exception originalException) {
    final HystrixRequestContext requestContext = HystrixRequestContext.getContextForCurrentThread();
    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
    // record the executionResult
    // do this before executing fallback so it can be queried from within getFallback (see See https://github.com/Netflix/Hystrix/pull/144)
    executionResult = executionResult.addEvent((int) latency, eventType);

    if (shouldNotBeWrapped(originalException)){
        /* executionHook for all errors */
        Exception e = wrapWithOnErrorHook(failureType, originalException);
        return Observable.error(e);
    } else if (isUnrecoverable(originalException)) {
        logger.error("Unrecoverable Error for HystrixCommand so will throw HystrixRuntimeException and not apply fallback. ", originalException);

        /* executionHook for all errors */
        Exception e = wrapWithOnErrorHook(failureType, originalException);
        return Observable.error(new HystrixRuntimeException(failureType, this.getClass(), getLogMessagePrefix() + " " + message + " and encountered unrecoverable error.", e, null));
    } else {
        if (isRecoverableError(originalException)) {
            logger.warn("Recovered from java.lang.Error by serving Hystrix fallback", originalException);
        }

        if (properties.fallbackEnabled().get()) {
            /* fallback behavior is permitted so attempt */

            final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
                @Override
                public void call(Notification<? super R> rNotification) {
                    setRequestContextIfNeeded(requestContext);
                }
            };

            final Action1<R> markFallbackEmit = new Action1<R>() {
                @Override
                public void call(R r) {
                    if (shouldOutputOnNextEvents()) {
                        executionResult = executionResult.addEvent(HystrixEventType.FALLBACK_EMIT);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_EMIT, commandKey);
                    }
                }
            };

            final Action0 markFallbackCompleted = new Action0() {
                @Override
                public void call() {
                    long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                    eventNotifier.markEvent(HystrixEventType.FALLBACK_SUCCESS, commandKey);
                    executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_SUCCESS);
                }
            };

            final Func1<Throwable, Observable<R>> handleFallbackError = new Func1<Throwable, Observable<R>>() {
                @Override
                public Observable<R> call(Throwable t) {
                    Exception e = originalException;
                    Exception fe = getExceptionFromThrowable(t);

                    if (fe instanceof UnsupportedOperationException) {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        logger.debug("No fallback for HystrixCommand. ", fe); // debug only since we're throwing the exception and someone higher will do something with it
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_MISSING, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_MISSING);

                        /* executionHook for all errors */
                        e = wrapWithOnErrorHook(failureType, e);

                        return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and no fallback available.", e, fe));
                    } else {
                        long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                        logger.debug("HystrixCommand execution " + failureType.name() + " and fallback failed.", fe);
                        eventNotifier.markEvent(HystrixEventType.FALLBACK_FAILURE, commandKey);
                        executionResult = executionResult.addEvent((int) latency, HystrixEventType.FALLBACK_FAILURE);

                        /* executionHook for all errors */
                        e = wrapWithOnErrorHook(failureType, e);

                        return Observable.error(new HystrixRuntimeException(failureType, _cmd.getClass(), getLogMessagePrefix() + " " + message + " and fallback failed.", e, fe));
                    }
                }
            };

            final TryableSemaphore fallbackSemaphore = getFallbackSemaphore();
            final AtomicBoolean semaphoreHasBeenReleased = new AtomicBoolean(false);
            final Action0 singleSemaphoreRelease = new Action0() {
                @Override
                public void call() {
                    if (semaphoreHasBeenReleased.compareAndSet(false, true)) {
                        fallbackSemaphore.release();
                    }
                }
            };

            Observable<R> fallbackExecutionChain;

            //上面那些定義的其實都不會在這里呼叫,主要是看下面的
            // acquire a permit
            if (fallbackSemaphore.tryAcquire()) {
                try {
                    if (isFallbackUserDefined()) {
                        executionHook.onFallbackStart(this);
                        //HystrixCommand類#getFallbackObservable
                        fallbackExecutionChain = getFallbackObservable();
                    } else {
                        //same logic as above without the hook invocation
                        fallbackExecutionChain = getFallbackObservable();
                    }
                } catch (Throwable ex) {
                    //If hook or user-fallback throws, then use that as the result of the fallback lookup
                    fallbackExecutionChain = Observable.error(ex);
                }

                return fallbackExecutionChain
                        .doOnEach(setRequestContext)
                        .lift(new FallbackHookApplication(_cmd))
                        .lift(new DeprecatedOnFallbackHookApplication(_cmd))
                        .doOnNext(markFallbackEmit)
                        .doOnCompleted(markFallbackCompleted)
                        .onErrorResumeNext(handleFallbackError)
                        .doOnTerminate(singleSemaphoreRelease)
                        .doOnUnsubscribe(singleSemaphoreRelease);
            } else {
               return handleFallbackRejectionByEmittingError();
            }
        } else {
            return handleFallbackDisabledByEmittingError(originalException, failureType, message);
        }
    }
}

//HystrixCommand類#getFallbackObservable
@Override
final protected Observable<R> getFallbackObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                //呼叫GenericCommand類的getFallback方法【子類重新寫父類】
                return Observable.just(getFallback());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    });
}

//GenericCommand類#getFallback方法
@Override
protected Object getFallback() {
    final CommandAction commandAction = getFallbackAction();
    if (commandAction != null) {
        try {
            return process(new Action() {
                @Override
                Object execute() {
                    MetaHolder metaHolder = commandAction.getMetaHolder();
                    Object[] args = createArgsForFallback(metaHolder, getExecutionException());
                    return commandAction.executeWithArgs(metaHolder.getFallbackExecutionType(), args);
                }
            });
        } catch (Throwable e) {
            LOGGER.error(FallbackErrorMessageBuilder.create()
                    .append(commandAction, e).build());
            throw new FallbackInvocationException(unwrapCause(e));
        }
    } else {
        return super.getFallback();
    }
}

 

【13】分析核心的executeCommandAndObserve執行邏輯

private Observable<R> executeCommandAndObserve(final AbstractCommand<R> _cmd) {
    final HystrixRequestContext currentRequestContext = HystrixRequestContext.getContextForCurrentThread();
    //主要是來對HystrixCommand和HystrixObservableCommand記錄的事件是不同的
    final Action1<R> markEmits = new Action1<R>() {
        @Override
        public void call(R r) {
            if (shouldOutputOnNextEvents()) {
                executionResult = executionResult.addEvent(HystrixEventType.EMIT);
                eventNotifier.markEvent(HystrixEventType.EMIT, commandKey);
            }
            if (commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                circuitBreaker.markSuccess();
            }
        }
    };

    final Action0 markOnCompleted = new Action0() {
        @Override
        public void call() {
            if (!commandIsScalar()) {
                long latency = System.currentTimeMillis() - executionResult.getStartTimestamp();
                eventNotifier.markCommandExecution(getCommandKey(), properties.executionIsolationStrategy().get(), (int) latency, executionResult.getOrderedList());
                eventNotifier.markEvent(HystrixEventType.SUCCESS, commandKey);
                executionResult = executionResult.addEvent((int) latency, HystrixEventType.SUCCESS);
                circuitBreaker.markSuccess();
            }
        }
    };

    //執行失敗的邏輯定義 
    final Func1<Throwable, Observable<R>> handleFallback = new Func1<Throwable, Observable<R>>() {
        @Override
        public Observable<R> call(Throwable t) {
            Exception e = getExceptionFromThrowable(t);
            executionResult = executionResult.setExecutionException(e);
            if (e instanceof RejectedExecutionException) {
                return handleThreadPoolRejectionViaFallback(e);
            } else if (t instanceof HystrixTimeoutException) {
                return handleTimeoutViaFallback();
            } else if (t instanceof HystrixBadRequestException) {
                return handleBadRequestByEmittingError(e);
            } else {
                /*
                 * Treat HystrixBadRequestException from ExecutionHook like a plain HystrixBadRequestException.
                 */
                if (e instanceof HystrixBadRequestException) {
                    eventNotifier.markEvent(HystrixEventType.BAD_REQUEST, commandKey);
                    return Observable.error(e);
                }

                return handleFailureViaFallback(e);
            }
        }
    };

    final Action1<Notification<? super R>> setRequestContext = new Action1<Notification<? super R>>() {
        @Override
        public void call(Notification<? super R> rNotification) {
            setRequestContextIfNeeded(currentRequestContext);
        }
    };

    //上面定義的都是一些異步呼叫事件,主體在這里
    Observable<R> execution;
    //如果超時開啟  使用HystrixObservableTimeoutOperator來對Observable做超時處理,
    //所以不管是信號量隔離還是執行緒池隔離都會走該邏輯進行超時控制,
    if (properties.executionTimeoutEnabled().get()) {
        //看名字就知道是特殊的隔離,也就是隔離邏輯所在
        execution = executeCommandWithSpecifiedIsolation(_cmd)
                .lift(new HystrixObservableTimeoutOperator<R>(_cmd));
    } else {
        execution = executeCommandWithSpecifiedIsolation(_cmd);
    }

    return execution.doOnNext(markEmits)
            .doOnCompleted(markOnCompleted)
            .onErrorResumeNext(handleFallback)
            .doOnEach(setRequestContext);
}

private Observable<R> executeCommandWithSpecifiedIsolation(final AbstractCommand<R> _cmd) {
    if (properties.executionIsolationStrategy().get() == ExecutionIsolationStrategy.THREAD) {
        // mark that we are executing in a thread (even if we end up being rejected we still were a THREAD execution and not SEMAPHORE)
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.THREAD);

                if (isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT) {
                    // the command timed out in the wrapping thread so we will return immediately
                    // and not increment any of the counters below or other such logic
                    return Observable.error(new RuntimeException("timed out before executing run()"));
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.STARTED)) {
                    //we have not been unsubscribed, so should proceed
                    HystrixCounters.incrementGlobalConcurrentThreads();
                    threadPool.markThreadExecution();
                    // store the command that is being run
                    endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                    executionResult = executionResult.setExecutedInThread();
                    /**
                     * If any of these hooks throw an exception, then it appears as if the actual execution threw an error
                     */
                    try {
                        executionHook.onThreadStart(_cmd);
                        executionHook.onRunStart(_cmd);
                        executionHook.onExecutionStart(_cmd);
                        return getUserExecutionObservable(_cmd);
                    } catch (Throwable ex) {
                        return Observable.error(ex);
                    }
                } else {
                    //command has already been unsubscribed, so return immediately
                    return Observable.error(new RuntimeException("unsubscribed before executing run()"));
                }
            }
        }).doOnTerminate(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.TERMINAL)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.TERMINAL)) {
                    //if it was never started and received terminal, then no need to clean up (I don't think this is possible)
                }
                //if it was unsubscribed, then other cleanup handled it
            }
        }).doOnUnsubscribe(new Action0() {
            @Override
            public void call() {
                if (threadState.compareAndSet(ThreadState.STARTED, ThreadState.UNSUBSCRIBED)) {
                    handleThreadEnd(_cmd);
                }
                if (threadState.compareAndSet(ThreadState.NOT_USING_THREAD, ThreadState.UNSUBSCRIBED)) {
                    //if it was never started and was cancelled, then no need to clean up
                }
                //if it was terminal, then other cleanup handled it
            }
        }).subscribeOn(threadPool.getScheduler(new Func0<Boolean>() {
            //執行緒池隔離呼叫
            @Override
            public Boolean call() {
                return properties.executionIsolationThreadInterruptOnTimeout().get() && _cmd.isCommandTimedOut.get() == TimedOutStatus.TIMED_OUT;
            }
        }));
    } else {
        return Observable.defer(new Func0<Observable<R>>() {
            @Override
            public Observable<R> call() {
                executionResult = executionResult.setExecutionOccurred();
                if (!commandState.compareAndSet(CommandState.OBSERVABLE_CHAIN_CREATED, CommandState.USER_CODE_EXECUTED)) {
                    return Observable.error(new IllegalStateException("execution attempted while in state : " + commandState.get().name()));
                }

                metrics.markCommandStart(commandKey, threadPoolKey, ExecutionIsolationStrategy.SEMAPHORE);
                // semaphore isolated
                // store the command that is being run
                endCurrentThreadExecutingCommand = Hystrix.startCurrentThreadExecutingCommand(getCommandKey());
                try {
                    executionHook.onRunStart(_cmd);
                    executionHook.onExecutionStart(_cmd);
                    return getUserExecutionObservable(_cmd);  //the getUserExecutionObservable method already wraps sync exceptions, so this shouldn't throw
                } catch (Throwable ex) {
                    //If the above hooks throw, then use that as the result of the run method
                    return Observable.error(ex);
                }
            }
        });
    }
}

private Observable<R> getUserExecutionObservable(final AbstractCommand<R> _cmd) {
    Observable<R> userObservable;

    try {
        userObservable = getExecutionObservable();
    } catch (Throwable ex) {
        // the run() method is a user provided implementation so can throw instead of using Observable.onError
        // so we catch it here and turn it into Observable.error
        userObservable = Observable.error(ex);
    }

    return userObservable
            .lift(new ExecutionHookApplication(_cmd))
            .lift(new DeprecatedOnRunHookApplication(_cmd));
}

@Override
final protected Observable<R> getExecutionObservable() {
    return Observable.defer(new Func0<Observable<R>>() {
        @Override
        public Observable<R> call() {
            try {
                //呼叫GenericCommand類的run方法【子類重新寫父類】
                return Observable.just(run());
            } catch (Throwable ex) {
                return Observable.error(ex);
            }
        }
    }).doOnSubscribe(new Action0() {
        @Override
        public void call() {
            // Save thread on which we get subscribed so that we can interrupt it later if needed
            executionThread.set(Thread.currentThread());
        }
    });
}
//GenericCommand類#run方法
@Override
protected Object run() throws Exception {
    LOGGER.debug("execute command: {}", getCommandKey().name());
    return process(new Action() {
        @Override
        Object execute() {
            return getCommandAction().execute(getExecutionType());
        }
    });
}

【14】分析隔離執行緒池的執行緒隔離threadPool.getScheduler的初始化【位于第六步里面】

private static HystrixThreadPool initThreadPool(HystrixThreadPool fromConstructor, HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter threadPoolPropertiesDefaults) {
    if (fromConstructor == null) {
        // get the default implementation of HystrixThreadPool
        //通過塞入注解資訊threadPoolPropertiesDefaults進行初始化
        return HystrixThreadPool.Factory.getInstance(threadPoolKey, threadPoolPropertiesDefaults);
    } else {
        return fromConstructor;
    }
}

static HystrixThreadPool getInstance(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesBuilder) {
    // get the key to use instead of using the object itself so that if people forget to implement equals/hashcode things will still work
    String key = threadPoolKey.name();

    // this should find it for all but the first time
    HystrixThreadPool previouslyCached = threadPools.get(key);
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // if we get here this is the first time so we need to initialize
    synchronized (HystrixThreadPool.class) {
        if (!threadPools.containsKey(key)) {
            threadPools.put(key, new HystrixThreadPoolDefault(threadPoolKey, propertiesBuilder));
        }
    }
    return threadPools.get(key);
}

public HystrixThreadPoolDefault(HystrixThreadPoolKey threadPoolKey, HystrixThreadPoolProperties.Setter propertiesDefaults) {
    this.properties = HystrixPropertiesFactory.getThreadPoolProperties(threadPoolKey, propertiesDefaults);
    HystrixConcurrencyStrategy concurrencyStrategy = HystrixPlugins.getInstance().getConcurrencyStrategy();
    this.queueSize = properties.maxQueueSize().get();

    this.metrics = HystrixThreadPoolMetrics.getInstance(threadPoolKey,
            concurrencyStrategy.getThreadPool(threadPoolKey, properties),
            properties);
    this.threadPool = this.metrics.getThreadPool();
    this.queue = this.threadPool.getQueue();

    /* strategy: HystrixMetricsPublisherThreadPool */
    HystrixMetricsPublisherFactory.createOrRetrievePublisherForThreadPool(threadPoolKey, this.metrics, this.properties);
}

 

【15】斷路器初始化分析【位于第六步里面】

private static HystrixCircuitBreaker initCircuitBreaker(boolean enabled, HystrixCircuitBreaker fromConstructor,
                                                        HystrixCommandGroupKey groupKey, HystrixCommandKey commandKey,
                                                        HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    // 如果啟用了熔斷器
    if (enabled) {
        // 若commandKey沒有對應的CircuitBreaker,則創建
        if (fromConstructor == null) {
            // get the default implementation of HystrixCircuitBreaker
            return HystrixCircuitBreaker.Factory.getInstance(commandKey, groupKey, properties, metrics);
        } else {
            // 如果有則回傳現有的
            return fromConstructor;
        }
    } else {
        return new NoOpCircuitBreaker();
    }
}

//circuitBreaker以commandKey為維度,每個commandKey都會有對應的circuitBreaker
public static HystrixCircuitBreaker getInstance(HystrixCommandKey key, HystrixCommandGroupKey group, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    // 如果有則回傳現有的, key.name()即command的name作為檢索條件
    HystrixCircuitBreaker previouslyCached = circuitBreakersByCommand.get(key.name());
    if (previouslyCached != null) {
        return previouslyCached;
    }

    // 如果沒有則創建并cache
    HystrixCircuitBreaker cbForCommand = circuitBreakersByCommand.putIfAbsent(key.name(), new HystrixCircuitBreakerImpl(key, group, properties, metrics));
    if (cbForCommand == null) {
        // this means the putIfAbsent step just created a new one so let's retrieve and return it
        return circuitBreakersByCommand.get(key.name());
    } else {
        // this means a race occurred and while attempting to 'put' another one got there before
        // and we instead retrieved it and will now return it
        return cbForCommand;
    }
}

protected HystrixCircuitBreakerImpl(HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixCommandProperties properties, HystrixCommandMetrics metrics) {
    this.properties = properties;
    this.metrics = metrics;
}

 【16】健康分析【涉及到滑動視窗的設計】

HystrixCommandMetrics(final HystrixCommandKey key, HystrixCommandGroupKey commandGroup, HystrixThreadPoolKey threadPoolKey, HystrixCommandProperties properties, HystrixEventNotifier eventNotifier) {
    super(null);
    this.key = key;
    this.group = commandGroup;
    this.threadPoolKey = threadPoolKey;
    this.properties = properties;

    healthCountsStream = HealthCountsStream.getInstance(key, properties);
    rollingCommandEventCounterStream = RollingCommandEventCounterStream.getInstance(key, properties);
    cumulativeCommandEventCounterStream = CumulativeCommandEventCounterStream.getInstance(key, properties);

    rollingCommandLatencyDistributionStream = RollingCommandLatencyDistributionStream.getInstance(key, properties);
    rollingCommandUserLatencyDistributionStream = RollingCommandUserLatencyDistributionStream.getInstance(key, properties);
    rollingCommandMaxConcurrencyStream = RollingCommandMaxConcurrencyStream.getInstance(key, properties);
}

public static HealthCountsStream getInstance(HystrixCommandKey commandKey, HystrixCommandProperties properties) {
    //默認是按500ms為一個段
    final int healthCountBucketSizeInMs = properties.metricsHealthSnapshotIntervalInMilliseconds().get();
    if (healthCountBucketSizeInMs == 0) {
        throw new RuntimeException(...);
    }
    //獲得桶數:設定的時間/段的時間
    final int numHealthCountBuckets = properties.metricsRollingStatisticalWindowInMilliseconds().get() / healthCountBucketSizeInMs;
    //初始化
    return getInstance(commandKey, numHealthCountBuckets, healthCountBucketSizeInMs);
}

public static HealthCountsStream getInstance(HystrixCommandKey commandKey, int numBuckets, int bucketSizeInMs) {
    HealthCountsStream initialStream = streams.get(commandKey.name());
    if (initialStream != null) {
        return initialStream;
    } else {
        final HealthCountsStream healthStream;
        synchronized (HealthCountsStream.class) {
            HealthCountsStream existingStream = streams.get(commandKey.name());
            if (existingStream == null) {
                //新建邏輯
                HealthCountsStream newStream = new HealthCountsStream(commandKey, numBuckets, bucketSizeInMs,
                        HystrixCommandMetrics.appendEventToBucket);

                streams.putIfAbsent(commandKey.name(), newStream);
                healthStream = newStream;
            } else {
                healthStream = existingStream;
            }
        }
        healthStream.startCachingStreamValuesIfUnstarted();
        return healthStream;
    }
}

private HealthCountsStream(final HystrixCommandKey commandKey, final int numBuckets, final int bucketSizeInMs,
                           Func2<long[], HystrixCommandCompletion, long[]> reduceCommandCompletion) {
    super(HystrixCommandCompletionStream.getInstance(commandKey), numBuckets, bucketSizeInMs, reduceCommandCompletion, healthCheckAccumulator);
}

public abstract class BucketedRollingCounterStream<Event extends HystrixEvent, Bucket, Output> extends BucketedCounterStream<Event, Bucket, Output> {
    private Observable<Output> sourceStream;
    private final AtomicBoolean isSourceCurrentlySubscribed = new AtomicBoolean(false);

    protected BucketedRollingCounterStream(HystrixEventStream<Event> stream, final int numBuckets, int bucketSizeInMs, final Func2<Bucket, Event, Bucket> appendRawEventToBucket, final Func2<Output, Bucket, Output> reduceBucket) {
        //呼叫父級方法
        super(stream, numBuckets, bucketSizeInMs, appendRawEventToBucket);
        Func1<Observable<Bucket>, Observable<Output>> reduceWindowToSummary = new Func1<Observable<Bucket>, Observable<Output>>() {
            @Override
            public Observable<Output> call(Observable<Bucket> window) {
                return window.scan(getEmptyOutputValue(), reduceBucket).skip(numBuckets);
            }
        };
        this.sourceStream = bucketedStream      //stream broken up into buckets
                .window(numBuckets, 1)          //emit overlapping windows of buckets
                .flatMap(reduceWindowToSummary) //convert a window of bucket-summaries into a single summary
                .doOnSubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(true);
                    }
                })
                .doOnUnsubscribe(new Action0() {
                    @Override
                    public void call() {
                        isSourceCurrentlySubscribed.set(false);
                    }
                })
                .share()                        //multiple subscribers should get same data
                .onBackpressureDrop();          //if there are slow consumers, data should not buffer
    }

    @Override
    public Observable<Output> observe() {
        return sourceStream;
    }

    /* package-private */ boolean isSourceCurrentlySubscribed() {
        return isSourceCurrentlySubscribed.get();
    }
}

public abstract class BucketedCounterStream<Event extends HystrixEvent, Bucket, Output> {
    protected final int numBuckets;
    protected final Observable<Bucket> bucketedStream;
    protected final AtomicReference<Subscription> subscription = new AtomicReference<Subscription>(null);

    private final Func1<Observable<Event>, Observable<Bucket>> reduceBucketToSummary;

    private final BehaviorSubject<Output> counterSubject = BehaviorSubject.create(getEmptyOutputValue());

    protected BucketedCounterStream(final HystrixEventStream<Event> inputEventStream, final int numBuckets, final int bucketSizeInMs,
                                    final Func2<Bucket, Event, Bucket> appendRawEventToBucket) {
        this.numBuckets = numBuckets;
        this.reduceBucketToSummary = new Func1<Observable<Event>, Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call(Observable<Event> eventBucket) {
                return eventBucket.reduce(getEmptyBucketSummary(), appendRawEventToBucket);
            }
        };

        final List<Bucket> emptyEventCountsToStart = new ArrayList<Bucket>();
        for (int i = 0; i < numBuckets; i++) {
            emptyEventCountsToStart.add(getEmptyBucketSummary());
        }

        this.bucketedStream = Observable.defer(new Func0<Observable<Bucket>>() {
            @Override
            public Observable<Bucket> call() {
                return inputEventStream
                        .observe()
                        .window(bucketSizeInMs, TimeUnit.MILLISECONDS) //bucket it by the counter window so we can emit to the next operator in time chunks, not on every OnNext
                        .flatMap(reduceBucketToSummary)                //for a given bucket, turn it into a long array containing counts of event types
                        .startWith(emptyEventCountsToStart);           //start it with empty arrays to make consumer logic as generic as possible (windows are always full)
            }
        });
    }

    abstract Bucket getEmptyBucketSummary();

    abstract Output getEmptyOutputValue();

    /**
     * Return the stream of buckets
     * @return stream of buckets
     */
    public abstract Observable<Output> observe();

    public void startCachingStreamValuesIfUnstarted() {
        if (subscription.get() == null) {
            //the stream is not yet started
            Subscription candidateSubscription = observe().subscribe(counterSubject);
            if (subscription.compareAndSet(null, candidateSubscription)) {
                //won the race to set the subscription
            } else {
                //lost the race to set the subscription, so we need to cancel this one
                candidateSubscription.unsubscribe();
            }
        }
    }

    /**
     * Synchronous call to retrieve the last calculated bucket without waiting for any emissions
     * @return last calculated bucket
     */
    public Output getLatest() {
        startCachingStreamValuesIfUnstarted();
        if (counterSubject.hasValue()) {
            return counterSubject.getValue();
        } else {
            return getEmptyOutputValue();
        }
    }

    public void unsubscribe() {
        Subscription s = subscription.get();
        if (s != null) {
            s.unsubscribe();
            subscription.compareAndSet(s, null);
        }
    }
}

 

Hystrix原始碼分析圖

 

 

 

MetaHolder

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/518862.html

標籤:其他

上一篇:Magnet: Push-based Shuffle Service for Large-scale Data Processing

下一篇:Codeforces1695 D1.+D2 Tree Queries

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more