CircuitBreaker 斷路器
服務熔斷是為了保護我們的服務,比如當某個服務出現問題的時候,控制打向它的流量,讓它有時間去恢復,或者限制一段時間只能有固定數量的請求打向這個服務,這些都是保護措施,我在實際作業中也確實遇到過,資料庫出現問題了,進而導致Web服務出現問題了,導致不依賴資料庫的服務也出現問題了,出現一連串問題, 這次學習《玩轉 Spring 全家桶》,丁雪豐老師給了使用resilience4j的例子, 丁老師的例子是2019年的,這個框架已經修改了些方法,所以我自己也花了些時間來理解了它的用法,現將程序記錄下來,
首先POM檔案引入
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>2.0.2</version>
</dependency>
接著改造之前的Controller方法
@RestController
@RequestMapping("/customer")
@Slf4j
public class BookController {
@Autowired
private BookService bookService;
private CircuitBreaker circuitBreaker;
public BookController(CircuitBreakerRegistry registry) {
circuitBreaker = registry.circuitBreaker("menu");
}
@GetMapping("/menu")
public List<Book> readMenu() {
Supplier<List<Book>> supplier = () -> bookService.getAll();
circuitBreaker.getEventPublisher()
.onEvent(event -> log.info(event.toString()));
try{
return circuitBreaker.executeSupplier(supplier);
}
catch (Exception ex)
{
log.error(ex.getMessage());
return Collections.emptyList();
}
}
}
不同的地方就是引入了CircuitBreaker, 然后使用它將我們的方法“bookService.getAll()”包起來了,
然后在組態檔中添加如下的配置
resilience4j.circuitbreaker.backends.menu.failure-rate-threshold=50
resilience4j.circuitbreaker.backends.menu.wait-duration-in-open-state=60000
resilience4j.circuitbreaker.backends.menu.sliding-window-size=5
resilience4j.circuitbreaker.backends.menu.permitted-number-of-calls-in-half-open-state=2
resilience4j.circuitbreaker.backends.menu.minimum-number-of-calls=2
稍微解釋一下這里的配置
failure-rate-threshold=50是說失敗率超過50%就熔斷,
wait-duration-in-open-state= 60000,是說熔斷后等待60S才允許再次呼叫,
sliding-window-size =5 可以理解為5個請求統計一次,
permitted-number-of-calls-in-half-open-state = 2是說進入半開的狀態的時候,還允許請求多少個,
minimum-number-of-calls=2是說最少有多少個請求才開始統計, 這里的引數都是我為了實驗設定的,實際情況根據需要進行調整,引數比較多,具體可以參加官方檔案
https://resilience4j.readme.io/docs/circuitbreaker
我們來看下實際的效果通過瀏覽器訪問,
首先我們現打開BookService,讓它有一次成功的請求,日志會輸出
CircuitBreaker 'menu' recorded a successful call.
然后我們將BookService關閉,讓它請求失敗,日志會輸出如下
CircuitBreaker 'menu' recorded an error: 'feign.RetryableException: Connection refused: no further information executing GET http://bookshop-service/book/getAll'. Elapsed time: 2050 ms
CircuitBreaker 'menu' exceeded failure rate threshold. Current failure rate: 50.0
CircuitBreaker 'menu' changed state from CLOSED to OPEN
可以看到斷路器已經打開了,
接著我們繼續訪問會出現,
CircuitBreaker 'menu' recorded a call which was not permitted.
這個時候請求不會打到BookService上面了,就算這個時候我們的BookService恢復正常,
等待60s后進入半Open的狀態
CircuitBreaker 'menu' changed state from OPEN to HALF_OPEN
這個時候恢復BookService正常,我們請求也會正常回應了
CircuitBreaker 'menu' recorded a successful call
多請求幾次,斷路器就從HALF_OPEN變成了CLOSED
CircuitBreaker 'menu' changed state from HALF_OPEN to CLOSED
這里給一個官方的狀態圖來說明

斷路器有三個狀態: CLOSED, OPEN, HALF_OPEN,
- CLOSED是最開始的狀態,也就是關閉狀態,流量可以正常通過,
- 當失敗比率超過threshold后,斷路器打開, 變成OPEN 打開后流量不可以通過;
- 等待一定的時間后,斷路器進入半開狀態 HALF_OPEN, 這個時候如果失敗率低于閾值,斷路器進入CLOSED狀態,如果超過閾值,斷路器繼續保證OPEN,再等待,如此往復,
斷路器現在還支持設定慢請求,使用起來還是比較方便,對于引數的設定如果不是很理解,可以通過單元測驗的方法來加深對它的理解,這里參考https://github.com/eugenp/tutorials/blob/master/libraries-6/src/test/java/com/baeldung/resilence4j/Resilience4jUnitTest.java 上面的例子,給出來個單元測驗
interface RemoteService {
int process(int i);
}
private RemoteService service;
@Test
public void whenCircuitBreakerIsUsed_thenItWorksAsExpected() {
service = mock(RemoteService.class);
CircuitBreakerConfig config = CircuitBreakerConfig.custom()
// Percentage of failures to start short-circuit
.failureRateThreshold(20)
.minimumNumberOfCalls(5)
.build();
CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
CircuitBreaker circuitBreaker = registry.circuitBreaker("my");
Function<Integer, Integer> decorated = CircuitBreaker.decorateFunction(circuitBreaker, service::process);
when(service.process(anyInt())).thenThrow(new RuntimeException());
circuitBreaker.getEventPublisher()
.onEvent(event ->
{
log.info(event.toString());
});
for (int i = 0; i < 10; i++) {
try {
decorated.apply(i);
} catch (Exception ignore) {
}
}
verify(service, times(5)).process(any(Integer.class));
}
這里設定最少請求5次,失敗率超過20%就熔斷,然后我們請求了10次,實際上只呼叫了Service5次,
對于其它引數,你可以調整后,根據需要來驗證是否符合預期,它的日志輸出如下
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 2 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' recorded an error: 'java.lang.RuntimeException'. Elapsed time: 0 ms
CircuitBreaker 'my' exceeded failure rate threshold. Current failure rate: 100.0
CircuitBreaker 'my' changed state from CLOSED to OPEN
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
CircuitBreaker 'my' recorded a call which was not permitted.
可以看到5次過后,就開始打開斷路器,后面的call就不被允許了,
隔艙Bulkhead
Resilience4j 里面的Bulkhead可以簡單的理解為允許多少個并發訪問,我們這里還是通過單元測驗的方法來演示它的功能
@Test
public void whenBulkheadIsUsed_thenItWorksAsExpected() throws InterruptedException {
service = mock(RemoteService.class);
BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(2).build();
BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead("my");
Function<Integer, Integer> decorated = Bulkhead.decorateFunction(bulkhead, service::process);
try {
callAndBlock(decorated);
}
catch(BulkheadFullException ex)
{
log.error("isfull");
}
finally
{
verify(service, times(2)).process(any(Integer.class));
}
}
private void callAndBlock(Function<Integer, Integer> decoratedService) throws InterruptedException {
when(service.process(anyInt())).thenAnswer(invocation -> {
log.info("service called");
return null;
});
ArrayList<Integer> numberList = new ArrayList<Integer>();
for(int i = 0;i<10;i++)
{
numberList.add(i);
}
numberList.parallelStream().forEach((i)->{
try {
decoratedService.apply(i);
}
catch (Exception ex)
{
log.error("meet error " + ex.getMessage());
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
}
首先我們解讀一下callAndBlock, 它會并發的去執行一個function. 如果我們不用隔艙,它的輸出會是這樣,
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-9] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-5] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-3] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-1] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-6] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-7] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-8] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.010+08:00 INFO 37276 --- [onPool-worker-2] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:22:52.011+08:00 INFO 37276 --- [ main] c.k.r.bookcustomer.Resilience4jUnitTest : service called
可以看到啟動了10個執行緒去訪問方法,加了隔艙后,隔艙限定了一次只能兩個,輸出如下
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-6] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-7] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-8] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-5] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-2] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.648+08:00 ERROR 32256 --- [onPool-worker-3] c.k.r.bookcustomer.Resilience4jUnitTest : meet error Bulkhead 'my' is full and does not permit further calls
2022-12-28T15:33:48.650+08:00 INFO 32256 --- [onPool-worker-1] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:33:48.650+08:00 INFO 32256 --- [ main] c.k.r.bookcustomer.Resilience4jUnitTest : service called
可以看到只有兩次成功的訪問,其它的訪問都被block了,
限速器RateLimiter
RateLimiter的功能是限定一段時間內允許多少次訪問,還是使用和Bulkhead一樣的例子一樣
@Test
public void whenRateLimiterInUse_thenItWorksAsExpected() throws InterruptedException {
service = mock(RemoteService.class);
RateLimiterConfig config = RateLimiterConfig.custom()
.limitRefreshPeriod(Duration.ofMillis(1000))
.limitForPeriod(4)
.timeoutDuration(Duration.ofMillis(25))
.build();
RateLimiterRegistry rateLimiterRegistry = RateLimiterRegistry.of(config);
RateLimiter rateLimiter = rateLimiterRegistry
.rateLimiter("name1");
CheckedFunction<Integer, Integer> decorated = RateLimiter
.decorateCheckedFunction(rateLimiter, service::process);
try {
callAndBlock(decorated);
}
catch(Exception ex)
{
log.error("isfull");
}
finally
{
verify(service, times(4)).process(any(Integer.class));
}
}
private void callAndBlock(CheckedFunction<Integer, Integer> decoratedService) throws InterruptedException {
when(service.process(anyInt())).thenAnswer(invocation -> {
log.info("service called");
return null;
});
ArrayList<Integer> numberList = new ArrayList<Integer>();
for(int i = 0;i<10;i++)
{
numberList.add(i);
}
numberList.parallelStream().forEach((i)->{
try {
decoratedService.apply(i);
}
catch (Exception ex)
{
log.error("meet error " + ex.getMessage());
} catch (Throwable e) {
throw new RuntimeException(e);
}
});
}
我們這里故意設定1S中允許訪問4次,實際的運行情況也是只允許了4次,日志輸出如下
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [onPool-worker-2] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [onPool-worker-5] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [ main] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.027+08:00 INFO 35236 --- [onPool-worker-7] c.k.r.bookcustomer.Resilience4jUnitTest : service called
2022-12-28T15:39:52.053+08:00 ERROR 35236 --- [onPool-worker-6] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-3] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-9] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-1] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.060+08:00 ERROR 35236 --- [onPool-worker-8] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
2022-12-28T15:39:52.075+08:00 ERROR 35236 --- [onPool-worker-4] c.k.r.bookcustomer.Resilience4jUnitTest : meet error RateLimiter 'name1' does not permit further calls
限速器這個功能只能限制在整體性能上面,如果要限制某個用戶,只能某段時間訪問多少次,它就做不到了,
Relilience4j 里面還提供了Retry,TimeLimiter,Cache. 感覺不是很有必要的功能, Retry在spring里面有相應的功能了,沒有必要專門為了使用它而多加個包, TimeLimiter,Cache 我感覺不是很受重視的功能,連例子檔案都懶得提供,可見意義不大,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/540875.html
標籤:其他
