前言
有些時候我們在線上出現一些bug, 無法定位到真正的請求資料是什么, 其實單純的將日志打到控制臺并不利于我們排錯, 有些時候可能需要完整的請求引數和例外資訊
需求: 將請求資料, 回應資料以及例外資訊持久化到資料庫中, 要求最大限度提高性能, 不要讓日志處理影響業務時間
可能有的小伙伴一想到日志處理就會下意識想到aop處理, 但是僅僅通過aop處理的話, 我們會面臨兩個問題
1. 我們的性能如何保證
2.是否需要每一次請求都去持久化一次
這里我的方案是使用aop+異步佇列+批量持久化來實作, 其中使用異步佇列的靈感來源于nacos和zookeeper(居多)的架構設計
在zookeeper里大量使用了異步佇列來提高性能, 包括nacos的服務注冊也使用了異步佇列來提高性能, 這些佇列的實作既提高了性能, 也避免了執行緒的資源競爭
類結構
編碼階段, 主要分為這幾個部分
1. aop切面撰寫
2.佇列提交類
3.佇列處理worker
完成之后的類結構:
1. aop切面

AdviceProcessor和ExceptionAdviceProcessor: 切面處理
AbstractLogAspect: 切面處理的父類(可以選擇日志列印, 具體的處理動作延遲到子類)
EnduranceLogAspect: 持久化日志aop, 包括請求和回應, 并不處理例外
RequestRecordExceptionAdvice: 請求記錄例外處理
2.佇列提交類

AbstractRefreshTaskMessageProcessor: 此類囊括了所有的TaskMessageHandler(這里用到了策略模式)
SimpleTaskMessageProcessor: 提交任務到佇列
其實TaskMessageHandler并不是僅僅做請求日志處理的作業, 他可以做得事情有很多, 只要是需要通過異步佇列做得作業他都可以做
3.佇列處理worker(這個涉及的比較多)

我來介紹下這些類的作業職責
1. TaskMessageHandler<T extends Message> 佇列處理的頂級介面
2. QueueTaskManager<T extends Message> 佇列的管理者, 既存盤和獲取
3. AbstractQueueTaskManager<T extends Message> 佇列的統一管理者, 提供佇列的基礎api
4. AbstractQueueTaskWorkerLifecycleController<T extends Message> 佇列作業者生命周期控制器
5.AbstractBatchTaskWorker<T extends Message>批量處理任務作業者(重點)
6.BatchTaskProcessorMetadata 批量任務處理元資料
7.RequestRecordLogTaskWorker請求日志任務作業者(選擇性實作, 不固定, 可以不選擇持久化, 而是發到mq之類的做日志分析)
8. AbstractSimpleTaskWorker: 任務處理的邏輯我提供了第二種實作, 第一種就是上面的批量實作, 第二種就是這個簡單實作, 他在從佇列拿到一個資料之后不會等待, 而是馬上處理
上代碼
基礎代碼
/**
* Description:保存請求記錄
*
* @author zhaoguangyun
* @date 2021/7/26
*/
public class RequestRecordContext {
private static final ThreadLocal<RequestRecord> tl = new ThreadLocal<>();
public static RequestRecord get(){
return tl.get();
}
public static void set(RequestRecord requestRecord){
tl.set(requestRecord);
}
public static void remove(){
tl.remove();
}
}
public interface Message {
}
/**
* Description:
*
* @author zhaoguangyun
* @date 2021/7/26
*/
public class RequestRecord implements Message {
/**
* 主鍵
*/
private Long id;
private String requestId;
private String requestParam;
private String responseParam;
private String exceptionMsg;
private Date createTime;
public Long getId() {
return id;
}
public void setId(Long id) {
this.id = id;
}
public String getRequestId() {
return requestId;
}
public void setRequestId(String requestId) {
this.requestId = requestId;
}
public String getRequestParam() {
return requestParam;
}
public void setRequestParam(String requestParam) {
this.requestParam = requestParam;
}
public String getResponseParam() {
return responseParam;
}
public void setResponseParam(String responseParam) {
this.responseParam = responseParam;
}
public String getExceptionMsg() {
return exceptionMsg;
}
public void setExceptionMsg(String exceptionMsg) {
this.exceptionMsg = exceptionMsg;
}
public Date getCreateTime() {
return createTime;
}
public void setCreateTime(Date createTime) {
this.createTime = createTime;
}
}
/**
* Description:執行緒池配置
*
* @author zhaoguangyun
*/
@Component
public class TaskExecutorService implements BeanFactoryAware, InitializingBean {
private static Executor executor;
private static BeanFactory beanFactory;
@Override
public void setBeanFactory(BeanFactory beanFactory) throws BeansException {
TaskExecutorService.beanFactory = beanFactory;
}
public static void execute(Runnable runnable){
executor.execute(runnable);
}
public static void setExecutor(Executor executor) {
TaskExecutorService.executor = executor;
}
@Override
public void afterPropertiesSet() throws Exception {
setExecutor(beanFactory.getBean(TaskExecutor.class));
}
}
基礎代碼里包括ThreadLocal, 請求記錄物體以及執行緒池
aop切面代碼
/**
* Description:
*
* @author zhaoguangyun
*/
public interface AdviceProcessor {
void beforeProcessor(HttpServletRequest request, JoinPoint joinPoint);
void afterProcessor(Object o);
}
/**
* Description:
*
* @author zhaoguangyun
*/
public interface ExceptionAdviceProcessor extends AdviceProcessor{
void exceptionProcessor(Exception e);
@Override
default void afterProcessor(Object o){
}
@Override
default void beforeProcessor(HttpServletRequest request, JoinPoint joinPoint) {
}
}
/**
* Description:
*
* @author zhaoguangyun
*/
@Aspect
public abstract class AbstractLogAspect implements AdviceProcessor {
private static Logger log = LoggerFactory.getLogger(AdviceProcessor.class);
@Pointcut("@annotation(com.requestrecord.autoconfiguration.annotation.EnduranceLog)")
private void logAspect(){}
//可以不列印, 設定為false即可
@Value("${endurance.log.request-record.print:false}")
private boolean print;
/**
* @Description: 方法執行前, 列印請求資訊
* @Author: zhaoguangyun
* @param joinPoint:
* @return: void
**/
@Before("logAspect()")
public void before(JoinPoint joinPoint){
try {
ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder
.getRequestAttributes();
if (Objects.isNull(requestAttributes)){
return;
}
HttpServletRequest request = requestAttributes.getRequest();
if (print){
// 列印請求內容
log.info("===============請求內容===============");
log.info("請求地址:" + request.getRequestURL().toString());
log.info("請求方式:" + request.getMethod());
log.info("請求類方法:" + joinPoint.getSignature());
log.info("請求類方法引數:" + JSONObject.toJSONString(joinPoint.getArgs()));
log.info("===============請求內容===============");
}
beforeProcessor(request,joinPoint);
} catch (Exception e) {
log.error("LogAspect#before() error >>> msg: "+e.getMessage());
}
}
/**
* @Description: 列印回傳引數
* @Author: zhaoguangyun
* @param o: 回傳值
* @return: void
**/
@AfterReturning(returning = "o",pointcut = "logAspect()")
public void afterReturn(Object o){
try {
if (print){
log.info("--------------回傳內容----------------");
log.info(JSONObject.toJSONString(o));
log.info("--------------回傳內容----------------");
}
afterProcessor(o);
} catch (Exception e) {
log.error("LogAspect#afterReturn() error >>> msg: "+e.getMessage());
}
}
@AfterThrowing(throwing = "e",pointcut = "logAspect()")
public void afterThrowing(Exception e){
if (print){
log.error("--------------發生例外----------------");
log.error(e.getMessage());
log.error("--------------發生例外----------------");
}
exceptionProcessor(e);
}
protected abstract void exceptionProcessor(Exception e);
}
/**
* Description:
*
* @author zhaoguangyun
*/
public class EnduranceLogAspect extends AbstractLogAspect {
@Autowired
private SimpleTaskMessageProcessor simpleTaskMessageProcessor;
@Override
public void beforeProcessor(HttpServletRequest request, JoinPoint joinPoint) {
RequestRecord requestRecord = getOrSetRequestRecord();
requestRecord.setRequestId(UUID.randomUUID().toString().replace("-",""));
requestRecord.setRequestParam(JSONObject.toJSONString(joinPoint.getArgs()));
}
@Override
public void afterProcessor(Object o) {
RequestRecord requestRecord = getOrSetRequestRecord();
requestRecord.setResponseParam(JSONObject.toJSONString(o));
//提交持久化任務
submit(requestRecord);
}
protected RequestRecord getOrSetRequestRecord(){
RequestRecord requestRecord = RequestRecordContext.get();
if (requestRecord != null){
return requestRecord;
}
requestRecord = new RequestRecord();
RequestRecordContext.set(requestRecord);
return requestRecord;
}
@Override
protected void exceptionProcessor(Exception e) {}
protected void submit(RequestRecord requestRecord){
simpleTaskMessageProcessor.processor(requestRecord);
}
}
/**
* Description:
*
* @author zhaoguangyun
*/
@Component
public class RequestRecordExceptionAdvice extends EnduranceLogAspect implements ExceptionAdviceProcessor{
@Override
public void exceptionProcessor(Exception e) {
RequestRecord requestRecord = RequestRecordContext.get();
submitRequestRecord(e,requestRecord);
}
protected void submitRequestRecord(Exception e,RequestRecord requestRecord){
StackTraceElement[] stackTrace = e.getStackTrace();
StringBuilder sb = new StringBuilder(e.getClass().getName());
sb.append(": ").append(e.getMessage());
sb.append("\n");
sb.append("\tat ");
Stream.of(stackTrace).forEach(s -> {
sb.append(s.getClassName()).append("(").append(s.getFileName()).append(":").append(s.getLineNumber()).append(")");
sb.append("\n");
sb.append("\tat ");
});
requestRecord.setExceptionMsg(sb.toString());
submit(requestRecord);
}
}
在aop切面里, 進行捕捉請求, 然后提交任務
佇列提交類
/**
* Description:
*
* @author zhaoguangyun
*/
public abstract class AbstractRefreshTaskMessageProcessor implements InitializingBean {
@Autowired
private List<TaskMessageHandler<?>> taskMessageHandlerList = new ArrayList<>();
private Map<Class<?>, TaskMessageHandler> taskMessageHandlerMap;
@Override
public void afterPropertiesSet() throws Exception {
taskMessageHandlerMap = new HashMap<>(
taskMessageHandlerList.size() + (taskMessageHandlerList.size() >>> 1));
for (TaskMessageHandler queueTaskManager : taskMessageHandlerList) {
taskMessageHandlerMap.put(queueTaskManager.handleType(),queueTaskManager);
}
}
protected Map<Class<?>, TaskMessageHandler> getTaskMessageHandlerMap() {
return taskMessageHandlerMap;
}
public abstract void processor(Message message);
}
@Component
public class SimpleTaskMessageProcessor extends AbstractRefreshTaskMessageProcessor {
@SuppressWarnings("unchecked")
@Override
public void processor(Message message) {
TaskExecutorService.execute(() -> {
TaskMessageHandler handler = getTaskMessageHandlerMap().get(message.getClass());
if (handler instanceof QueueTaskManager){
((QueueTaskManager) handler).offer(message);
}
});
}
}
佇列提交類主要作業就是將任務提交到佇列中
佇列處理worker
介面
public interface TaskMessageHandler<T extends Message> {
Class<T> handleType();
void batchHandle(List<T> t);
void handle(T t);
}
public interface QueueTaskManager<T extends Message> extends TaskMessageHandler<T> {
void offer(T requestRecord);
T take(long waitTime) throws InterruptedException;
T take() throws InterruptedException;
}
public interface BatchTaskProcessorMetadata {
int getMaxProcessCount();
long getMaxWaitTime();
}
這些介面規定了一些對外方法
具體實作
public abstract class AbstractQueueTaskManager<T extends Message>
implements Runnable,QueueTaskManager<T> {
protected static Logger log = LoggerFactory.getLogger(TaskMessageHandler.class);
private ArrayBlockingQueue<T> arrayBlockingQueue = new ArrayBlockingQueue<>(1024 * 1024);
@Override
public void offer(T requestRecord){
arrayBlockingQueue.offer(requestRecord);
}
@Override
public T take(long waitTime) throws InterruptedException {
return arrayBlockingQueue.poll(waitTime, TimeUnit.MILLISECONDS);
}
@Override
public T take() throws InterruptedException {
return arrayBlockingQueue.take();
}
}
此類是佇列的讀取類, 這里如果有需要可以換成redis, 我為了簡單, 弄了個記憶體佇列
其實這個類也可以設計成組合的方式, 而不是繼承的方式, 將處理和佇列訪問進行隔離, 方便換資料源, 解耦
public abstract class AbstractQueueTaskWorkerLifecycleController<T extends Message>
extends AbstractQueueTaskManager<T> implements BeanNameAware, DisposableBean, SmartLifecycle {
private AtomicBoolean running = new AtomicBoolean();
private String beanName;
@Override
public void start() {
if (running.compareAndSet(Boolean.FALSE, Boolean.TRUE)) {
//執行緒的生命周期跟隨springioc容器的生命周期, 使用執行緒池反而會浪費資源, 故直接new Thread
new Thread(this).start();
log.info("{} worker thread started ....", beanName);
}
}
@Override
public boolean isRunning() {
return running.get();
}
@Override
public void setBeanName(String beanName) {
this.beanName = beanName;
}
@Override
public void stop() {
running.compareAndSet(Boolean.TRUE, Boolean.FALSE);
}
@Override
public void destroy() throws Exception {
stop();
}
protected String getBeanName() {
return beanName;
}
}
此類主要就是對處理任務的執行緒做生命周期控制
public abstract class AbstractBatchTaskWorker<T extends Message> extends AbstractQueueTaskWorkerLifecycleController<T>
implements BatchTaskProcessorMetadata, InitializingBean {
//每次處理的數量
@Value("${endurance.log.request-record.maxProcessCount:500}")
private int maxProcessCount;
//如果數量不滿, 那么最大的等待時間
@Value("${endurance.log.request-record.maxWaitTime:5000}")
private int maxWaitTime;
@Override
public void run() {
List<T> list = new ArrayList<>();
long startTime = System.currentTimeMillis();
while (isRunning()) {
try {
//剩下的沉睡時間
long surplusWaitTime = getMaxWaitTime() - (System.currentTimeMillis() - startTime);
T t = take(surplusWaitTime);
if (t == null && getMaxWaitTime() - (System.currentTimeMillis() - startTime) > 0L) {
continue;
}
if (t != null) {
list.add(t);
}
if ((System.currentTimeMillis() - startTime) < getMaxWaitTime()
&& list.size() < getMaxProcessCount()) {
continue;
}
if (!CollectionUtils.isEmpty(list)) {
batchHandle(list);
}
list.clear();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
log.error("{}#run() error, msg: {}", getBeanName(), e.getMessage());
}
startTime = System.currentTimeMillis();
}
log.warn("{} worker thread shutdown ....", getBeanName());
}
@Override
public int getMaxProcessCount() {
return maxProcessCount;
}
@Override
public long getMaxWaitTime() {
return maxWaitTime;
}
@Override
public void handle(T t) {
}
@Override
public void afterPropertiesSet() throws Exception {
init();
}
protected void init() {
log.info("{} worker init", getBeanName());
}
}
此類主要做批量處理任務的作業, 當任務數量達到maxProcessCount時或者等待時間達到maxWaitTime時, 會進行處理
//此類為佇列處理的第二種實作, 簡單處理, 從佇列中take到資料后直接呼叫子類的處理方法, 不會等待
public abstract class AbstractSimpleTaskWorker<T extends Message> extends AbstractQueueTaskWorkerLifecycleController<T> {
private static Logger log = LoggerFactory.getLogger(AbstractSimpleTaskWorker.class);
@Override
public void run() {
while (isRunning()) {
try {
T t = take();
handle(t);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (Exception e) {
log.error("{}#run() error, msg: {}",getBeanName(), e.getMessage());
}
}
log.warn("{} worker thread shutdown ....", getBeanName());
}
@Override
public void batchHandle(List<T> t) {}
}
此類為佇列處理的第二種實作, 簡單處理, 從佇列中take到資料后直接呼叫子類的處理方法, 不會等待
/**
* Description:支持
*
* @author zhaoguangyun
*/
@Slf4j
@Component
public class RequestRecordLogTaskWorker extends AbstractBatchTaskWorker<RequestRecord> {
@Override
public Class<RequestRecord> handleType() {
//這里回傳的是期望處理的資料型別
return RequestRecord.class;
}
@Override
public void batchHandle(List<RequestRecord> t) {
//持久化的代碼或者其他處理邏輯......
}
}
以上就是全部的代碼
總結
我們來做一個流程的總結
1. springboot run啟動
2. spring ioc在填充完成SimpleTaskMessageProcessor bean時, 回呼他的afterPropertiesSet()方法
在afterPropertiesSet()方法里把所有TaskMessageHandler物件(交給spring管理的), 以 handleType()方法回傳的型別為key, 物件本身為value, 全部放入到Map中
3. 在spring ioc啟動完成后, 由于AbstractQueueTaskWorkerLifecycleController實作了SmartLifecycle介面(isAutoStartup()回傳true), 會回呼他的start方法, 啟動這個任務執行緒, 啟動之后執行run()方法, 在run方法里呼叫take方法, 此時沒有資料, 會陷入沉睡
在ioc關閉時, 如果isRunning()回傳true, 會回呼stop方法
4. 至此啟動完成, 此時有請求, 先進入到aop切面中進行決議, 封裝為一個RequestRecord物件, 放入到ThreadLocal中, 在請求結束后或者出現例外, 從ThreadLocal中拿到RequestRecord記錄資訊, 然后提交到SimpleTaskMessageProcessor中
5. SimpleTaskMessageProcessor提交任務到執行緒池, 此任務會根據提交的Message的class為key從Map中拿到一個TaskMessageHandler, 然后呼叫offer()方法, 將資料放入到queue中
6. queue中有資料, 喚醒沉睡的TaskMessageHandler執行緒, 此時拿到提交的資料, 如果不滿, 或者沉睡時間不足, 再次嘗試拿取, 第二次拿不到又會陷入沉睡, 如果數量達到了指定的maxProcessCount(子類可重寫)則會呼叫batchHandle方法處理, 具體的邏輯是持久化還是其他邏輯, 就由你定制了.
其實第5步和第6步的邏輯, 比較通用, 并不一定是處理日志, 也可以做其他的邏輯, 只需要寫一個AbstractBatchTaskWorker或者AbstractSimpleTaskWorker的子類, 然后把任務提交到SimpleTaskMessageProcessor中就可以了
流程圖

大家有什么建議歡迎在評論區指出, 謝謝
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/295226.html
標籤:其他
上一篇:c++基礎——區分參考和指標
