Spring Boot2+Resilience4j實現容錯之Bulkhead
Resilience4j是一個輕量級、易於使用的容錯庫,其靈感來自Netflix Hystrix,但專為Java 8和函數式編程設計。輕量級,因為庫只使用Vavr,它沒有任何其他外部庫依賴項。相比之下,Netflix Hystrix對Archaius有一個編譯依賴關係,Archaius有更多的外部庫依賴關係,如Guava和Apache Commons。
Resilience4j提供高階函數(decorators)來增強任何功能接口、lambda表達式或方法引用,包括斷路器、速率限制器、重試或艙壁。可以在任何函數接口、lambda表達式或方法引用上使用多個裝飾器。優點是您可以選擇所需的裝飾器,而無需其他任何東西。
有了Resilience4j,你不必全力以赴,你可以選擇你需要的。
https://resilience4j.readme.io/docs/getting-started
概覽
Resilience4j提供了兩種艙壁模式(Bulkhead),可用於限制併發執行的次數:
- SemaphoreBulkhead(信號量艙壁,默認),基於Java併發庫中的Semaphore實現。
- FixedThreadPoolBulkhead(固定線程池艙壁),它使用一個有界隊列和一個固定線程池。
本文將演示在Spring Boot2中集成Resilience4j庫,以及在多併發情況下實現如上兩種艙壁模式。
引入依賴
在Spring Boot2項目中引入Resilience4j相關依賴
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-spring-boot2</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>io.github.resilience4j</groupId>
<artifactId>resilience4j-bulkhead</artifactId>
<version>1.4.0</version>
</dependency>
由於Resilience4j的Bulkhead依賴於Spring AOP,所以我們需要引入Spring Boot AOP相關依賴
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
我們可能還希望了解Resilience4j在程序中的運行時狀態,所以需要通過Spring Boot Actuator將其暴露出來
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
實現SemaphoreBulkhead(信號量艙壁)
resilience4j-spring-boot2實現了對resilience4j的自動配置,因此我們僅需在項目中的yml/properties文件中編寫配置即可。
SemaphoreBulkhead的配置項如下:
屬性配置 | 默認值 | 含義 |
---|---|---|
maxConcurrentCalls | 25 | 艙壁允許的最大并行執行量 |
maxWaitDuration | 0 | 嘗試進入飽和艙壁時,應阻塞線程的最長時間。 |
添加配置
示例(使用yml):
resilience4j.bulkhead:
configs:
default:
maxConcurrentCalls: 5
maxWaitDuration: 20ms
instances:
backendA:
baseConfig: default
backendB:
maxWaitDuration: 10ms
maxConcurrentCalls: 20
如上,我們配置了SemaphoreBulkhead的默認配置為maxConcurrentCalls: 5,maxWaitDuration: 20ms
。並在backendA實例上應用了默認配置,而在backendB實例上使用自定義的配置。這裏的實例可以理解為一個方法/lambda表達式等等的可執行單元。
編寫Bulkhead邏輯
定義一個受SemaphoreBulkhead管理的Service類:
@Service
public class BulkheadService {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@Autowired
private BulkheadRegistry bulkheadRegistry;
@Bulkhead(name = "backendA")
public JsonNode getJsonObject() throws InterruptedException {
io.github.resilience4j.bulkhead.Bulkhead.Metrics metrics = bulkheadRegistry.bulkhead("backendA").getMetrics();
logger.info("now i enter the method!!!,{}<<<<<<{}", metrics.getAvailableConcurrentCalls(), metrics.getMaxAllowedConcurrentCalls());
Thread.sleep(1000L);
logger.info("now i exist the method!!!");
return new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis());
}
}
如上,我們將@Bulkhead
註解放到需要管理的方法上面。並且通過name
屬性指定該方法對應的Bulkhead實例名字(這裏我們指定的實例名字為backendA,所以該方法將會利用默認的配置)。
定義接口類:
@RestController
public class BulkheadResource {
@Autowired
private BulkheadService bulkheadService;
@GetMapping("/json-object")
public ResponseEntity<JsonNode> getJsonObject() throws InterruptedException {
return ResponseEntity.ok(bulkheadService.getJsonObject());
}
}
編寫測試:
首先添加測試相關依賴
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<version>3.0.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
這裏我們使用rest-assured和awaitility編寫多併發情況下的API測試
public class SemaphoreBulkheadTests extends Resilience4jDemoApplicationTests {
@LocalServerPort
private int port;
@BeforeEach
public void init() {
RestAssured.baseURI = "http://localhost";
RestAssured.port = port;
}
@Test
public void 多併發訪問情況下的SemaphoreBulkhead測試() {
CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
statusList.add(given().get("/json-object").statusCode());
}
));
await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
System.out.println(statusList);
assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(5);
assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(3);
}
}
可以看到所有請求中只有前五個順利通過了,其餘三個都因為超時而導致接口報500異常。我們可能並不希望這種不友好的提示,因此Resilience4j提供了自定義的失敗回退方法。當請求併發量過大時,無法正常執行的請求將進入回退方法。
首先我們定義一個回退方法
private JsonNode fallback(BulkheadFullException exception) {
return new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis());
}
注意:回退方法應該和調用方法放置在同一類中,並且必須具有相同的方法簽名,並且僅帶有一個額外的目標異常參數。
然後在@Bulkhead
註解中指定回退方法:@Bulkhead(name = "backendA", fallbackMethod = "fallback")
最後修改API測試代碼:
@Test
public void 多併發訪問情況下的SemaphoreBulkhead測試使用回退方法() {
CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
statusList.add(given().get("/json-object").statusCode());
}
));
await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
System.out.println(statusList);
assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}
運行單元測試,成功!可以看到,我們定義的回退方法,在請求過量時起作用了。
實現FixedThreadPoolBulkhead(固定線程池艙壁)
FixedThreadPoolBulkhead的配置項如下:
配置名稱 | 默認值 | 含義 |
---|---|---|
maxThreadPoolSize | Runtime.getRuntime().availableProcessors() |
配置最大線程池大小 |
coreThreadPoolSize | Runtime.getRuntime().availableProcessors() - 1 |
配置核心線程池大小 |
queueCapacity | 100 | 配置隊列的容量 |
keepAliveDuration | 20ms | 當線程數大於核心時,這是多餘空閑線程在終止前等待新任務的最長時間 |
添加配置
示例(使用yml):
resilience4j.thread-pool-bulkhead:
configs:
default:
maxThreadPoolSize: 4
coreThreadPoolSize: 2
queueCapacity: 2
instances:
backendA:
baseConfig: default
backendB:
maxThreadPoolSize: 1
coreThreadPoolSize: 1
queueCapacity: 1
如上,我們定義了一段簡單的FixedThreadPoolBulkhead配置,我們指定的默認配置為:maxThreadPoolSize: 4,coreThreadPoolSize: 2,queueCapacity: 2
,並且指定了兩個實例,其中backendA使用了默認配置而backendB使用了自定義的配置。
編寫Bulkhead邏輯
定義一個受FixedThreadPoolBulkhead管理的方法:
@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL)
public CompletableFuture<JsonNode> getJsonObjectByThreadPool() throws InterruptedException {
io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
logger.info("now i enter the method!!!,{}", metrics);
Thread.sleep(1000L);
logger.info("now i exist the method!!!");
return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}
如上定義和SemaphoreBulkhead的方法大同小異,其中@Bulkhead
显示指定了type的屬性為Bulkhead.Type.THREADPOOL
,表明其方法受FixedThreadPoolBulkhead管理。由於@Bulkhead
默認的Bulkhead是SemaphoreBulkhead,所以在未指定type的情況下為SemaphoreBulkhead。另外,FixedThreadPoolBulkhead只對CompletableFuture方法有效,所以我們必創建返回CompletableFuture類型的方法。
定義接口類方法
@GetMapping("/json-object-with-threadpool")
public ResponseEntity<JsonNode> getJsonObjectWithThreadPool() throws InterruptedException, ExecutionException {
return ResponseEntity.ok(bulkheadService.getJsonObjectByThreadPool().get());
}
編寫測試代碼
@Test
public void 多併發訪問情況下的ThreadPoolBulkhead測試() {
CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
statusList.add(given().get("/json-object-with-threadpool").statusCode());
}
));
await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
System.out.println(statusList);
assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(6);
assertThat(statusList.stream().filter(i -> i == 500).count()).isEqualTo(2);
}
測試中我們并行請求了8次,其中6次請求成功,2次失敗。根據FixedThreadPoolBulkhead的默認配置,最多能容納maxThreadPoolSize+queueCapacity次請求(根據我們上面的配置為6次)。
同樣,我們可能並不希望這種不友好的提示,那麼我們可以指定回退方法,在請求無法正常執行時使用回退方法。
private CompletableFuture<JsonNode> fallbackByThreadPool(BulkheadFullException exception) {
return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("errorFile", System.currentTimeMillis()));
}
@Bulkhead(name = "backendA", type = Bulkhead.Type.THREADPOOL, fallbackMethod = "fallbackByThreadPool")
public CompletableFuture<JsonNode> getJsonObjectByThreadPoolWithFallback() throws InterruptedException {
io.github.resilience4j.bulkhead.ThreadPoolBulkhead.Metrics metrics = threadPoolBulkheadRegistry.bulkhead("backendA").getMetrics();
logger.info("now i enter the method!!!,{}", metrics);
Thread.sleep(1000L);
logger.info("now i exist the method!!!");
return CompletableFuture.supplyAsync(() -> new ObjectMapper().createObjectNode().put("file", System.currentTimeMillis()));
}
編寫測試代碼
@Test
public void 多併發訪問情況下的ThreadPoolBulkhead測試使用回退方法() {
CopyOnWriteArrayList<Integer> statusList = new CopyOnWriteArrayList<>();
IntStream.range(0, 8).forEach(i -> CompletableFuture.runAsync(() -> {
statusList.add(given().get("/json-object-by-threadpool-with-fallback").statusCode());
}
));
await().atMost(1, TimeUnit.MINUTES).until(() -> statusList.size() == 8);
System.out.println(statusList);
assertThat(statusList.stream().filter(i -> i == 200).count()).isEqualTo(8);
}
由於指定了回退方法,所有請求的響應狀態都為正常了。
總結
本文首先簡單介紹了Resilience4j的功能及使用場景,然後具體介紹了Resilience4j中的Bulkhead。演示了如何在Spring Boot2項目中引入Resilience4j庫,使用代碼示例演示了如何在Spring Boot2項目中實現Resilience4j中的兩種Bulkhead(SemaphoreBulkhead和FixedThreadPoolBulkhead),並編寫API測試驗證我們的示例。
本文示例代碼地址:https://github.com/cg837718548/resilience4j-demo
歡迎訪問筆者博客:blog.dongxishaonian.tech
關注筆者公眾號,推送各類原創/優質技術文章 ⬇️
本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理
【其他文章推薦】
※自行創業缺乏曝光? 網頁設計幫您第一時間規劃公司的形象門面
※網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!
※想知道最厲害的網頁設計公司"嚨底家"!
※幫你省時又省力,新北清潔一流服務好口碑
※別再煩惱如何寫文案,掌握八大原則!