上篇文章我們講到了Reactive Feign,它在回應式微服務中替換了阻塞模型的Open Feign,今天我們要討論的是回應式的鏈路跟蹤,在微服務架構中,原來的單體被劃分為多個細小的模塊部署,一個請求可能需要橫跨多個小的服務才能完成它所要實作的功能,在傳統阻塞模型中有很多優秀的框架可以供我們解決這個問題,比如Zipkin,SkyWalking等等,那在Spring Webflux中他們表現如何呢?
Zipkin,SkyWalking對Spring WebFlux的支持
Zipkin
Zipkin是在Spring Cloud中是使用 Spring Cloud Sleuth進行鏈路跟蹤的,使用Spring的組件當然會對Spring自己家的產品有很好的支持,所以Zipkin對于Spring WebFlux是支持的,
SkyWalking
SkyWalking是使用探針的方式進行鏈路跟蹤,具體的原理還沒有仔細了解過,所以就直接去GitHub的issue中看它的Spring WebFlux的支持吧,看下圖確實在Spring WebFlux中使用SkyWalking有一些問題,作者對關于Spring WebFlux的問題的處理態度也是:


我想表達的意思只是SkyWalking對于Spring Webflux的支持可能不是那么完善,作者的精力也是有限的,如果你想讓SkyWalking對Spring WebFlux支持更加友好那么你可以做這方面的作業,所以,如果現在先在Spring WebFlux中使用SkyWaking還是要觀望一些,
那么看到這里是不是覺得Zipkin就是我們的唯一選擇了?最后筆者也沒有選用Zipkin,原因就是我為什么要使用Zipkin?鏈路跟蹤的意義是知道一個請求在各個服務之間呼叫的情況,避免出現問題不知道是哪個服務的問題,僅此而已,在微服務架構中,除了這個問題還有像日志統一管理等各種各樣的問題,不可能為了一個問題引入一個框架,導致系統的維護成本增加,所以最后決定自己去實作一個簡單的鏈路跟蹤,
傳統模式Spring MVC下實作
我們可以在每次請求到達網關時生成一個唯一請求id,這個請求到每個服務時都會攜帶這個id,在日志列印時將這個id列印出來,這樣我們就可以清楚的知道每個請求的鏈路了,這個方案中我們有幾個問題要解決:
- 通過什么方式生成唯一請求id?
- 怎么將這個id攜帶到每一個服務中?
- 日志列印需要每次自己手動列印嗎?
前兩個問題我會在后面實作Spring WebFlux鏈路跟蹤中進行解答,
第三個問題是不需要自己手動每次列印,設定好slf4j的列印格式就可以像列印執行緒名,日期一樣自動列印了,具體實作需要使用slf4j的MDC,像下面那樣設定,
MDC.put("mdc_trace_id", traceId);
這樣在攔截器中將每次請求的id設定進MDC就完成了第一步,接著再日志格式配置的地方添加%X{trace_id}就可以了,
pattern>%d{HH:mm:ss.SSS} [%-5p] [Thread: %t] %X{trace_id} %c:%L - %m%n</pattern>
在傳統模式下實作鏈路跟蹤還是很容易的,最后你需要做的就是將這些日志進行集中管理,方便查看,
回應式Spring WebFlux下實作
在回應式中的實作方案和傳統Spring MVC下一樣,也是每次請求到達網關時生成一個唯一請求id,這個請求到每個服務時都會攜帶這個id,在日志列印時將這個id列印出來,所以它也會有如下的幾個問題:
- 通過什么方式生成唯一請求id?
- 怎么將這個id攜帶到每一個服務中?
- 日志列印需要每次自己手動列印嗎?
通過什么方式生成唯一的請求id?
在Spring WebFlux官方檔案中有提到log id,
Log Id
In WebFlux, a single request can be run over multiple threads and the thread ID is not useful for correlating log messages that belong to a specific request. This is why WebFlux log messages are prefixed with a request-specific ID by default.
On the server side, the log ID is stored in the
ServerWebExchangeattribute (LOG_ID_ATTRIBUTE), while a fully formatted prefix based on that ID is available fromServerWebExchange#getLogPrefix(). On theWebClientside, the log ID is stored in theClientRequestattribute (LOG_ID_ATTRIBUTE) ,while a fully formatted prefix is available fromClientRequest#logPrefix().
官方對于這個id功能的解釋是由于WebFlux在處理一個請求時會涉及多次執行緒的切換,所以執行緒id對于一個請求它的日志資訊關聯的作用就不大了(在spring MVC中基本很少發生執行緒切換,執行緒id就可以關聯到這個請求的所有日志),這個log id保存在ServerWebExchange中,在WebClient中也有保存,
我們如果使用這個log id作為請求的唯一id,那它有沒有唯一性的保證,不會是每次都是從1開始然后開始自增的吧?我們可以去原始碼中看看log id的生成規則,
在org.springframework.web.server.adapter.DefaultServerWebExchange的構造方法中對ServerWebExchange.LOG_ID_ATTRIBUTE設定初始值,
this.attributes.put(ServerWebExchange.LOG_ID_ATTRIBUTE, request.getId());
@Override
@Nullable
protected String initId() {
if (this.request instanceof Connection) {
return ((Connection) this.request).channel().id().asShortText() +
"-" + logPrefixIndex.incrementAndGet();
}
return null;
}
@Override
public String asShortText() {
String shortValue = this.shortValue;
if (shortValue == null) {
this.shortValue = shortValue = ByteBufUtil.hexDump(data, data.length - RANDOM_LEN, RANDOM_LEN);
}
return shortValue;
}
--data的生成規則,機器mac號-行程號-序列號-亂數
data = new byte[MACHINE_ID.length + PROCESS_ID_LEN + SEQUENCE_LEN + TIMESTAMP_LEN + RANDOM_LEN];
int i = 0;
// machineId
System.arraycopy(MACHINE_ID, 0, data, i, MACHINE_ID.length);
i += MACHINE_ID.length;
// processId
i = writeInt(i, PROCESS_ID);
// sequence
i = writeInt(i, nextSequence.getAndIncrement());
// timestamp (kind of)
i = writeLong(i, Long.reverse(System.nanoTime()) ^ System.currentTimeMillis());
// random
int random = PlatformDependent.threadLocalRandom().nextInt();
i = writeInt(i, random);
assert i == data.length;
hashCode = Arrays.hashCode(data);
我看可以看一個大概,它確實是有使用自增,但是不是從1開始的,log id使用的request的id,request的id又使用的是channel id,這里的channel是netty中的channel,最后,我們可以看到channel id的生成需要使用到機器號,行程號,亂數,序列號,時間戳等,然后進行縮短再加一個自增數,可以說在一個微服務體系中重復的概率很低,在構成中有時間戳不會導致后面的序號和前面重復,機器號和行程號保證了不會和其他服務重復,所以log id完全可以用來作為請求的id(Spring WebFlux也確是使用它作為請求id),
怎么將這個id攜帶到每一個服務中?
我們可以將log id放入Spring WebFlux背景關系中(背景關系怎么使用請看這篇),然后自動將它加入到Reactive Feign的頭中(Reactive Feign請看這篇),在被呼叫方的攔截器中再將log id放入到背景關系中,并設定到ServerWebExchange的ServerWebExchange.LOG_ID_ATTRIBUTE屬性中,將它自動生成的覆寫掉,這樣就完成了log id的傳遞,
日志列印需要每次自己手動列印嗎?
在Spring WebFlux中我們可以使用slf4j的MDC嗎?可以,也不可以,在攔截器中我們可以將log id設定到MDC,但是由于MDC是使用ThreadLocal實作的,所以它只對當前執行緒有用,如果請求發生執行緒切換那就會打不出trace_id,同時也記得請求結束后清除MDC的trace_id,所以MDC在這里就顯得有點雞肋了,建議在重點的地方(比如例外)需要記錄trace_id便于排除問題和用戶行為分析的地方,手動的從背景關系中或exchange.getLogPrefix()獲取列印,
代碼實作
上面說了這么多,可能看的有點頭暈,下面展示一下具體的代碼實作,
Spring cloud Gateway中的攔截器:
/**
* @description:
* @author: lc
* @createDate: 2021/2/14
*/
@Component
@Slf4j
public class OutLogFilter implements GlobalFilter, Ordered {
private static final String REQUEST_TIME_BEGIN = "requestTimeBegin";
@Override
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());
--設定log id到請求頭
Consumer<HttpHeaders> httpHeaders = httpHeader -> {
httpHeader.set(SystemConstant.TRACE_ID, exchange.getAttribute(ServerWebExchange.LOG_ID_ATTRIBUTE));
};
ServerHttpRequest serverHttpRequest = exchange.getRequest().mutate().headers(httpHeaders).build();
exchange.mutate().request(serverHttpRequest).build();
return chain.filter(exchange).subscriberContext(ctx -> {
log.info("設定log{}", exchange.getLogPrefix());
return ctx.put(SystemConstant.TRACE_ID, exchange.getAttribute(ServerWebExchange.LOG_ID_ATTRIBUTE));
}).onErrorResume(error -> {
return deal(exchange).then(Mono.error(error));
}).then(deal(exchange));
}
--列印請求日志
private Mono deal(ServerWebExchange exchange) {
return Mono.fromRunnable(() -> {
Long startTime = exchange.getAttribute(REQUEST_TIME_BEGIN);
if (startTime != null) {
StringBuilder sb = new StringBuilder("請求地址:")
.append(exchange.getRequest().getURI().getRawPath())
.append(" 耗時: ")
.append(System.currentTimeMillis() - startTime)
.append("ms");
DataBuffer dataBuffer = (DataBuffer) exchange.getAttributes()
.get(ServerWebExchangeUtils.CACHED_REQUEST_BODY_ATTR);
if (dataBuffer != null) {
String s = dataBuffer.toString(Charset.forName("UTF-8"));
sb.append(" body:").append(s);
}
sb.append(" params:").append(exchange.getRequest().getQueryParams());
exchange.getResponse();
log.info(exchange.getLogPrefix() + sb.toString());
}
});
}
@Override
public int getOrder() {
return -100000;
}
}
被呼叫方攔截器 :
@Component
@Slf4j
@RequiredArgsConstructor
@ConditionalOnMissingBean(name = "outAuthFilter")
public class LogFilter implements OrderedWebFilter {
private static final String REQUEST_TIME_BEGIN = "requestTimeBegin";
@Override
public int getOrder() {
return -200;
}
@Override
public Mono<Void> filter(ServerWebExchange exchange, WebFilterChain chain) {
String headTraceId = exchange.getRequest().getHeaders().getFirst(SystemConstant.TRACE_ID);
log.info("獲取頭的log id .{}", headTraceId);
final String traceId;
if (headTraceId == null) {
traceId = exchange.getAttribute(ServerWebExchange.LOG_ID_ATTRIBUTE);
} else {
traceId = headTraceId;
}
exchange.getAttributes().put(REQUEST_TIME_BEGIN, System.currentTimeMillis());
exchange.getAttributes().put(LOG_ID_ATTRIBUTE, traceId);
return chain.filter(exchange).subscriberContext(ctx -> {
log.info("設定log:{}", traceId);
return ctx.put(SystemConstant.TRACE_ID, traceId);
});
}
}
總結與展望
本文描述了實作鏈路跟蹤的方式,目前來說還沒有很優雅的方式像阻塞模型MDC那樣自動的列印log id,還是需要手動的方式去處理,相信后面一些日志框架會增加回應式這方面的特性,將log id列印出來后,我們可以使用日志收集器將日志匯集到一起,然后分析日志,這里筆者考慮后面使用logstash+clickhouse來實作,
感謝閱讀,希望對你有幫助,
參考資料
https://docs.spring.io/spring-framework/docs/current/reference/html/web-reactive.html
https://azizulhaq-ananto.medium.com/how-to-handle-logs-and-tracing-in-spring-webflux-and-microservices-a0b45adc4610
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/283071.html
標籤:其他
上一篇:Nacos(配置中心)實踐
下一篇:反碎片技術和虛擬可移動區域
