一 . 前言
檔案目的
- 梳理 Gateway 生產中轉發請求的細節
- 梳理 轉發的定制點
知識補充
請求轉發是 Gateway 最核心的功能之一 , 他涉及到三個主要的概念 :
Route(路由): 路由是網關的基本單元,由ID、URI、一組Predicate、一組Filter組成,如果 Predicate 匹配 True ,則進行轉發
Predicate(謂語、斷言): 路由轉發的判斷條件,這是一個 Java 8函式斷言, 輸入型別是 Spring Framework ServerWebExchange , 目前SpringCloud Gateway支持多種方式,常見如:Path、Query、Method、Header等,寫法必須遵循 key=vlue的形式
Filter(過濾器): 過濾器是路由轉發請求時所經過的過濾邏輯,使用特定工廠構建的 GatewayFilter 實體 , 可用于修改請求、回應內容
二 . 簡單使用
2.1 predicates 匯總
//
- After=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Before=2017-01-20T17:42:47.789-07:00[America/Denver]
//
- Between=2017-01-20T17:42:47.789-07:00[America/Denver], 2017-01-21T17:42:47.789-07:00[America/Denver]
//
- Cookie=chocolate, ch.p
復制代碼
2.2 Mono 和 Flux
Mono 和 Flux 是貫穿了整個流程的核心物件 ,根據 reactive-streams 規范,發布服務器提供了數量可能無限的有序元素,并根據從其訂閱服務器接收到的需求發布這些元素,Reactor-core 有一組此 Publisher 介面的實作,我們將要創建序列的兩個重要實作是 Mono 和 Flux,
- Flux 表示的是包含 0 到 N 個元素的異步序列
- Mono 表示的是包含 0 或 1 個元素的異步序列
> SpringGateway 是使用 webflux 作為底層呼叫框架的 , 其中涉及到 mono 和 Flux 物件
> 該序列中可以包含 3 種通知 :
- 正常的包含元素的訊息
- 序列結束的訊息
- 序列出錯的訊息
Flux
- Flux是一個標準Publisher,表示0到N個發射項的異步序列,選擇性的以完成或錯誤信號終止,與Reactive Streams規范中一樣,這三種型別的信號轉換為對下游訂閱者的onNext、onComplete或onError方法的呼叫,

Mono
- Mono 是 Publisher 的另一個實作,它最多發出一個條目,然后(可選)以 onComplete 信號或 one rror 信號終止 , Mono 在本質上也是異步的
- 它只提供了可用于Flux的運算子的子集,并且一些運算子(特別是那些將Mono與另一個發布者組合的運算子)切換到Flux,
- 例如,Mono#concatWith(Publisher)回傳一個Flux ,而Mono#then(Mono)則回傳另一個Mono,

常見的方法如下 :
- create : 以編程方式創建具有多次發射能力的Flux,
- empty : 發出0元素或回傳空 Flux < t >
- just : 創建一個基礎
- error : 創建一個Flux,它在訂閱之后立即以指定的錯誤終止
PS : 這一塊就不深入看了 , 先看完 Gateway 的主流程
三 . 攔截深入
3.1 原理圖
首先來看一下 SpringGateway 的原理圖

四 . 呼叫的入口
4.1 呼叫流程
- Step 1 : HttpWebHandlerAdapter # handle : 構建 ServerWebExchange , 發起 Handler 處理
- Step 2 : DispatcherHandler # handle : 發起請求處理
- Step 3 : RoutePredicateHandlerMapping # getHandlerInternal : route 判斷處理
4.2. getHandlerInternal 邏輯
protected Mono<?> getHandlerInternal(ServerWebExchange exchange) {
// don't handle requests on management port if set and different than server port
if (this.managementPortType == DIFFERENT && this.managementPort != null
&& exchange.getRequest().getURI().getPort() == this.managementPort) {
return Mono.empty();
}
exchange.getAttributes().put(GATEWAY_HANDLER_MAPPER_ATTR, getSimpleName());
return lookupRoute(exchange)
// .log("route-predicate-handler-mapping", Level.FINER) //name this
.flatMap((Function<Route, Mono<?>>) r -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isDebugEnabled()) {
logger.debug("Mapping [" + getExchangeDesc(exchange) + "] to " + r);
}
exchange.getAttributes().put(GATEWAY_ROUTE_ATTR, r);
return Mono.just(webHandler);
}).switchIfEmpty(Mono.empty().then(Mono.fromRunnable(() -> {
exchange.getAttributes().remove(GATEWAY_PREDICATE_ROUTE_ATTR);
if (logger.isTraceEnabled()) {
logger.trace("No RouteDefinition found for [" + getExchangeDesc(exchange) + "]");
}
})));
}
復制代碼
3.2. lookupRoute
protected Mono<Route> lookupRoute(ServerWebExchange exchange) {
return this.routeLocator.getRoutes()
// individually filter routes so that filterWhen error delaying is not a
// problem
.concatMap(route -> Mono.just(route).filterWhen(r -> {
// add the current route we are testing
exchange.getAttributes().put(GATEWAY_PREDICATE_ROUTE_ATTR, r.getId());
return r.getPredicate().apply(exchange);
})
// instead of immediately stopping main flux due to error, log and
// swallow it
.doOnError(e -> logger.error("Error applying predicate for route: " + route.getId(), e))
.onErrorResume(e -> Mono.empty()))
// .defaultIfEmpty() put a static Route not found
// or .switchIfEmpty()
// .switchIfEmpty(Mono.<Route>empty().log("noroute"))
.next()
// TODO: error handling
.map(route -> {
validateRoute(route, exchange);
return route;
});
}
復制代碼
會遍歷所有的 route

五. 發送的流程
5.1 FilteringWebHandler 體系
此處的 webHandler 為 FilteringWebHandler 物件 , 來看一下這個物件的作用

這里涉及到以下的 Filter :
- C- ForwardPathFilter :
- C- ForwardRoutingFilter : 用來做本地forward的
- C- GatewayMetricsFilter : 與 Prometheus 整合,從而創建一個 Grafana dashboard
- C- LoadBalancerClientFilter : 用來整合Ribbon的 , 先獲取微服務的名稱,然后再通過Ribbon獲取實際的呼叫地址
- C- NettyRoutingFilter : http 或 https ,使用 Netty 的
HttpClient向下游的服務發送代理請求 - C- NettyWriteResponseFilter : 用于將代理回應寫回網關的客戶端側,所以該過濾器會在所有其他過濾器執行完成后才執行
- C- OrderedGatewayFilter :
- C- RouteToRequestUrlFilter : 將從request里獲取的 原始url轉換成Gateway進行請求轉發時所使用的url
- C- WebClientHttpRoutingFilter :
- C- WebClientWriteResponseFilter :
- C- WebsocketRoutingFilter : ws 或者 wss,那么該Filter會使用 Spring Web Socket 將 Websocket 請求轉發到下游
- C- WeightCalculatorWebFilter :
呼叫邏輯1 : FilteringWebHandler 管理
該物件中存在一個內部類 DefaultGatewayFilterChain , 該類為 Filter 過濾鏈
private static class DefaultGatewayFilterChain implements GatewayFilterChain {
// 當前 Filter 鏈索引
private final int index;
// Filter 集合
private final List<GatewayFilter> filters;
DefaultGatewayFilterChain(List<GatewayFilter> filters) {
this.filters = filters;
this.index = 0;
}
private DefaultGatewayFilterChain(DefaultGatewayFilterChain parent, int index) {
this.filters = parent.getFilters();
this.index = index;
}
public List<GatewayFilter> getFilters() {
return filters;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < filters.size()) {
// 逐個 Filter 過濾呼叫
GatewayFilter filter = filters.get(this.index);
DefaultGatewayFilterChain chain = new DefaultGatewayFilterChain(this,
this.index + 1);
return filter.filter(exchange, chain);
}
else {
return Mono.empty(); // complete
}
});
}
}
復制代碼
呼叫流程 3 : Filter 過濾
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 通常判斷部分條件 , 如果該 Filter 不符合 , 則跳過該 Filter
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
復制代碼
5.2 發送的主體
核心的發送 Filter 是 NettyRoutingFilter, 下面只關注這個 Filter 的相關邏輯 :
C- NettyRoutingFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
// 請求 URL : http://httpbin.org:80/get
URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
// 協議型別 : http
String scheme = requestUrl.getScheme();
// Step 1 : filter 鏈處理 ,如果不符合 http 協議 , 就通過下一個 Filter 處理
if (isAlreadyRouted(exchange)
|| (!"http".equals(scheme) && !"https".equals(scheme))) {
return chain.filter(exchange);
}
// Step 2 : 標識 Routed 已處理
setAlreadyRouted(exchange);
// Step 3 : 獲取 Request 請求物件 , 這個是外部請求的物件
ServerHttpRequest request = exchange.getRequest();
// Step 4 : 獲取 Method 型別 (get/post...)
final HttpMethod method = HttpMethod.valueOf(request.getMethodValue());
final String url = requestUrl.toString();
// Step 5 : 對 Header 進行處理 , 需要轉發過去
HttpHeaders filtered = filterRequest(getHeadersFilters(), exchange);
final DefaultHttpHeaders httpHeaders = new DefaultHttpHeaders();
filtered.forEach(httpHeaders::set);
// -> Transfer-Encoding
String transferEncoding = request.getHeaders().getFirst(HttpHeaders.TRANSFER_ENCODING);
boolean chunkedTransfer = "chunked".equalsIgnoreCase(transferEncoding);
// -> preserveHostHeader
boolean preserveHost = exchange.getAttributeOrDefault(PRESERVE_HOST_HEADER_ATTRIBUTE, false);
// 通過 netty httpClient 發起轉發請求 , PS !!! 這里是異步的
Flux<HttpClientResponse> responseFlux = this.httpClient
.chunkedTransfer(chunkedTransfer).request(method).uri(url)
.send((req, nettyOutbound) -> {
// Step 6 : 轉發 Header
req.headers(httpHeaders);
// => 是否需要記錄之前的 host
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
req.header(HttpHeaders.HOST, host);
}
// Step 7 : 真正發起請求
return nettyOutbound.options(NettyPipeline.SendOptions::flushOnEach)
.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).responseConnection((res, connection) -> {
// Step 8 : 請求完成 , 獲取 response
ServerHttpResponse response = exchange.getResponse();
// Step 9 : 轉發headers 和 status 等屬性
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
// => String CONTENT_TYPE = "Content-Type"
// => String ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR = "original_response_content_type";
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
// 轉發狀態 , 存在往 GatewayResponse 設定狀態
HttpStatus status = HttpStatus.resolve(res.status().code());
if (status != null) {
response.setStatusCode(status);
}
else if (response instanceof AbstractServerHttpResponse) {
((AbstractServerHttpResponse) response)
.setStatusCodeValue(res.status().code());
}
else {
throw new IllegalStateException(
"Unable to set status code on response: "
+ res.status().code() + ", "
+ response.getClass());
}
// 確保 Header filter 在設定狀態后運行, 校驗 header 中 filter 正常
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);
// String TRANSFER_ENCODING = "Transfer-Encoding"
// String CONTENT_LENGTH = "Content-Length"
if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// content-length 存在需要去掉 Transfer-Encoding
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
// 延遲提交回應,直到所有路由過濾器都運行
// 將客戶端回應作為ServerWebExchange屬性,稍后寫入回應NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
return Mono.just(res);
});
if (properties.getResponseTimeout() != null) {
// 超時例外處理
responseFlux = responseFlux.timeout(properties.getResponseTimeout(),
Mono.error(new TimeoutException("Response took longer than timeout: "
+ properties.getResponseTimeout())))
.onErrorMap(TimeoutException.class,
// GATEWAY_TIMEOUT(504, "Gateway Timeout")
th -> new ResponseStatusException(HttpStatus.GATEWAY_TIMEOUT,
th.getMessage(), th));
}
return responseFlux.then(chain.filter(exchange));
}
復制代碼
5.3 回傳 Response
C- NettyWriteResponseFilter
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
return chain.filter(exchange).then(Mono.defer(() -> {
// Step 1 : 獲取 GatewayRequest
Connection connection = exchange.getAttribute(CLIENT_RESPONSE_CONN_ATTR);
// 連接不存在直接回傳空
if (connection == null) {
return Mono.empty();
}
// Step 2 : 獲取 GatewayResponse
ServerHttpResponse response = exchange.getResponse();
NettyDataBufferFactory factory = (NettyDataBufferFactory) response
.bufferFactory();
// 此處主要包含一個 byteBufflux
final Flux<NettyDataBuffer> body = connection.inbound().receive().retain()
.map(factory::wrap);
// 媒體型別
MediaType contentType = null;
try {
contentType = response.getHeaders().getContentType();
}
catch (Exception e) {
log.trace("invalid media type", e);
}
return (isStreamingMediaType(contentType)
? response.writeAndFlushWith(body.map(Flux::just))
: response.writeWith(body));
}));
}
復制代碼
總結
由于 netty 的底層了解的還不是很清楚 , 對于一些呼叫程序沒辦法輸出資料看 , 這篇文章心里也不是很有底 , 后續深入后再來補充細節
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/293849.html
標籤:其他
