主頁 >  其他 > 跟著原始碼看lcn分布式事務

跟著原始碼看lcn分布式事務

2020-09-14 09:05:29 其他

跟著原始碼看lcn分布式事務

lcn分布式事務具體實作思路是服務器A創建事務,構建事務資訊并將事務資訊發送到事務處理器,處理程序中可能用到服務器B、C,會將事務組Id傳給B、C,B、C業務處理完成后將事務資訊加入到對應的事務組,并且創建一個執行緒對事務組狀態進行檢測A事務處理完成后向服務其發送的事務狀態,B、C會根據監督的狀態判斷對事務進行具體的提交或者回滾操作,

分布式事務流程圖

在這里插入圖片描述

lcn分布式事務是通過切面進行實作,lcn內部共有兩個aop切面,分別是DataSourceAspect和TransactionAspect,DataSourceAspect攔截對應的資料庫連接(connection),TransactionAspect進行具體的分布式事務的實作,這里事務的開始是通過TransactionAspect進行開始,這里僅截取lcn部分代碼去介紹,

private final DTXLogicWeaver dtxLogicWeaver;

@Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)")
public void lcnTransactionPointcut() {
}    

@Around("lcnTransactionPointcut() && !txcTransactionPointcut()" +
        "&& !tccTransactionPointcut() && !txTransactionPointcut()")
public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
    DTXInfo dtxInfo = DTXInfo.getFromCache(point);
    LcnTransaction lcnTransaction = dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
    dtxInfo.setTransactionType(Transactions.LCN);
    dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
    return dtxLogicWeaver.runTransaction(dtxInfo, point::proceed);
}

事務攔截器通過攔截帶有LcnTransaction注解的方法后進行攔截處理,然后啟動事務runTransaction方法啟動具體的分布式事務實作類,

public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {

    log.debug("<---- TxLcn start ---->");
  	//這里獲取dtxLocalContext,創建事務時新建,每個事務之只能有一個存在
    DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
    TxContext txContext;
    // ---------- 保證每個模塊在一個DTX下只會有一個TxContext ---------- //
  	//判斷是否存在事務資訊,沒有啟動事務,有加入到事務
    if (globalContext.hasTxContext()) {
        // 有事務背景關系的獲取事務背景關系
        txContext = globalContext.txContext();
        dtxLocalContext.setInGroup(true);
        log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
        // 本地事務呼叫
        if (Objects.nonNull(dtxLocalContext.getGroupId())) {
            dtxLocalContext.setDestroy(false);
        }
    } else {
        // 沒有的開啟本地事務背景關系
        txContext = globalContext.startTx();
        dtxLocalContext.setInGroup(false);
    }

    dtxLocalContext.setUnitId(dtxInfo.getUnitId());
    dtxLocalContext.setGroupId(txContext.getGroupId());
    dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());

    // 事務引數
    TxTransactionInfo info = new TxTransactionInfo();
    info.setBusinessCallback(business);
    info.setGroupId(txContext.getGroupId());
    info.setUnitId(dtxInfo.getUnitId());
    info.setPointMethod(dtxInfo.getBusinessMethod());
    info.setPropagation(dtxInfo.getTransactionPropagation());
    info.setTransactionInfo(dtxInfo.getTransactionInfo());
    info.setTransactionType(dtxInfo.getTransactionType());
    info.setTransactionStart(txContext.isDtxStart());

    //LCN事務處理器
    try {
        return transactionServiceExecutor.transactionRunning(info);
    } finally {
        // 執行緒執行業務完畢清理本地資料
        if (dtxLocalContext.isDestroy()) {
            // 通知事務執行完畢
            synchronized (txContext.getLock()) {
                txContext.getLock().notifyAll();
            }

            // TxContext生命周期是? 和事務組一樣(不與具體模塊相關的)
            if (!dtxLocalContext.isInGroup()) {
                globalContext.destroyTx();
            }
        }
        if(info.isTransactionStart() && info.getPropagation().equals(DTXPropagation.REQUIRES_NEW)) {
            DTXLocalContext.makeNeverAppeared();
            TracingContext.tracing().destroy();
        }
        log.debug("<---- TxLcn end ---->");
    }
}

這里會存在一個unitId代表每個服務的唯一性代碼,如果存在直接回傳,如果不存在則新建,是從DTXInfo中獲取,通過Transactions的unitId方式生成(具體代碼放在了業務代碼里,不方便貼出來),

//根據方法名生成unitId然后去dtxInfoCache中獲取,存在直接回傳,不存在存入快取中然后回傳
public static DTXInfo getFromCache(MethodInvocation methodInvocation) {
    String signature = methodInvocation.getMethod().toString();
    String unitId = Transactions.unitId(signature);
    DTXInfo dtxInfo = dtxInfoCache.get(unitId);
    if (Objects.isNull(dtxInfo)) {
        dtxInfo = new DTXInfo(methodInvocation.getMethod(),
                methodInvocation.getArguments(), methodInvocation.getThis().getClass());
        dtxInfoCache.put(unitId, dtxInfo);
    }
    dtxInfo.reanalyseMethodArgs(methodInvocation.getArguments());
    return dtxInfo;
}
//md5根據方法的signature進行計算得到
public static String unitId(String methodSignature) {
    return DigestUtils.md5DigestAsHex((APPLICATION_ID_WHEN_RUNNING + methodSignature).getBytes());
}

DTXLocalContext.getOrNew(),會獲取一個本地事務背景關系資訊,由于同一個事務可能多次用到,這里使用的時getOrNew當第一次使用使用直接新建,并保存在ThreadLocal變數currentLocal里,否則回傳已經存在的,這里是為了保證每個請求對應獲取到的是同一個類,避免并發問題引起分布式事務問題,

private final static ThreadLocal<Map<String, DTXLocalContext>> currentLocal = new InheritableThreadLocal<>();

public static DTXLocalContext getOrNew() {
    //這里是dataSource的名稱,可能是動態資料源,系統不同獲取就不同,這里把本系統的代碼去掉了
    String dataSourceName = "dataSource";
    if (currentLocal.get() == null) {
      	//存在背景關系資訊,當時不存在對應的dataSource的名稱,針對多資料源
        currentLocal.set(new ConcurrentHashMap<String, DTXLocalContext>());
    }
  	//首先獲取背景關系資訊
    DTXLocalContext dtxLocalContext = currentLocal.get().get(dataSourceName);
    if(null == dtxLocalContext) {
      	// 不存在直接新建事務背景關系并保存到本地集合
        dtxLocalContext = new DTXLocalContext();
        currentLocal.get().put(dataSourceName, dtxLocalContext);
    }
    //回傳背景關系資訊
    return dtxLocalContext;
}

globalContext.startTx(),啟動事務生成一個groupId然后將其存入ThreadLocal中,

public TxContext startTx() {
    TxContext txContext = new TxContext();
    // 事務發起方判斷,這里判斷是否有事務根據事務組id(groupId)進行判斷
    txContext.setDtxStart(!TracingContext.tracing().hasGroup());
    if (txContext.isDtxStart()) {
      	//沒有啟動事務則啟動事務
        TracingContext.tracing().beginTransactionGroup();
    }
    txContext.setGroupId(TracingContext.tracing().groupId());
    String txContextKey = txContext.getGroupId() + ".dtx";
    attachmentCache.attach(txContextKey, txContext);
    log.debug("Start TxContext[{}]", txContext.getGroupId());
    return txContext;
}

新建事務時會存在一個groupId代表整個事務唯一id,當事務不存在的情況下(根據groupId進行判斷,tracingContextThreadLocal中不存在groupId則不存在,否則就是存在,不存在的初始化一個,會隨機生成一個groupId,在tracingContextThreadLocal中加入初始化的事務資訊)

private static ThreadLocal<TracingContext> tracingContextThreadLocal = new ThreadLocal<>();

public static TracingContext tracing() {
    if (tracingContextThreadLocal.get() == null) {
        tracingContextThreadLocal.set(new TracingContext());
    }
    return tracingContextThreadLocal.get();
}
    

public void beginTransactionGroup() {
    if (hasGroup()) {
        return;
    }
  	//不存在事務組的情況下,初始化一個,gourpId是一個亂數
    init(Maps.newHashMap(TracingConstants.GROUP_ID, RandomUtils.randomKey(), TracingConstants.APP_MAP, "{}"));
}

public static void init(Map<String, String> initFields) {
    // 將生成的TracingContext資訊加入到tracingContextThreadLocal
    if (Objects.isNull(initFields)) {
        log.warn("init tracingContext fail. null init fields.");
        return;
    }
    TracingContext tracingContext = tracing();
    if (Objects.isNull(tracingContext.fields)) {
        tracingContext.fields = new HashMap<>();
    }
    tracingContext.fields.putAll(initFields);
}

具體處理邏輯在這里,這里會有事務的傳播行為處理propagationResolver.resolvePropagationState(info),事務處理前操作(向服務端發送請求,將事務加入到事務組)dtxLocalControl.preBusinessCode(info)、具體的業務處理dtxLocalControl.doBusinessCode(info)、業務成功后的操作dtxLocalControl.onBusinessCodeSuccess(info, result)、業務執行失敗操作dtxLocalControl.onBusinessCodeError(info, e)、業務結束操作dtxLocalControl.postBusinessCode(info),

public Object transactionRunning(TxTransactionInfo info) throws Throwable {

        // 1. 獲取事務型別
        String transactionType = info.getTransactionType();

        // 2. 獲取事務傳播狀態
        DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);

        // 2.1 如果不參與分布式事務立即終止
        if (propagationState.isIgnored()) {
            return info.getBusinessCallback().call();
        }

        // 3. 獲取本地分布式事務控制器
        DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);

        // 4. 織入事務操作
        try {
            // 4.1 記錄事務型別到事務背景關系
            Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
            transactionTypeSet.add(transactionType);

            dtxLocalControl.preBusinessCode(info);

            // 4.2 業務執行前
            txLogger.txTrace(
                    info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);

            // 4.3 執行業務
            Object result = dtxLocalControl.doBusinessCode(info);

            // 4.4 業務執行成功
            txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
            dtxLocalControl.onBusinessCodeSuccess(info, result);
            return result;
        } catch (TransactionException e) {
            txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
            throw e;
        } catch (Throwable e) {
            // 4.5 業務執行失敗
            txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
                    "business code error");
            dtxLocalControl.onBusinessCodeError(info, e);
            throw e;
        } finally {
            // 4.6 業務執行完畢
            dtxLocalControl.postBusinessCode(info);
        }
    }

傳播行為處理操作,具體處理是根據傳遞的屬性判斷、將事務加入到事務組、新建事務或直接不加入事務,這里不介紹具體的傳播行為,不理解的可以執行查找了解下,

public DTXPropagationState resolvePropagationState(TxTransactionInfo txTransactionInfo) throws TransactionException {

        // 本地已在DTX,根據事務傳播,靜默加入
        if (DTXLocalContext.cur().isInGroup()) {
            log.debug("SILENT_JOIN group!");
            return DTXPropagationState.SILENT_JOIN;
        }

        // 發起方之前沒有事務
        if (txTransactionInfo.isTransactionStart()) {
            // 根據事務傳播,對于 SUPPORTS 不參與事務
            if (DTXPropagation.SUPPORTS.equals(txTransactionInfo.getPropagation())) {
                return DTXPropagationState.NON;
            }
            // 根據事務傳播,創建事務
            return DTXPropagationState.CREATE;
        }

        // 已經存在DTX,根據事務傳播,加入
        return DTXPropagationState.JOIN;
    }

業務處理前操作dtxLocalControl.preBusinessCode(info),這里dtxLocalControl有兩個實作類,分別對應創建事務LcnStartingTransaction和加入事務LcnRunningTransaction,這里使用的是創建事務,

public class LcnStartingTransaction implements DTXLocalControl {

    private final TransactionControlTemplate transactionControlTemplate;

    private final TCGlobalContext globalContext;


@Autowired
public LcnStartingTransaction(TransactionControlTemplate transactionControlTemplate, TCGlobalContext globalContext) {
 	this.transactionControlTemplate = transactionControlTemplate;
    this.globalContext = globalContext;
}

    @Override
    public void preBusinessCode(TxTransactionInfo info) throws TransactionException {
        // 創建事務操作
        transactionControlTemplate.createGroup(
                info.getGroupId(), info.getUnitId(), info.getTransactionInfo(), info.getTransactionType());
        DTXLocalContext.makeProxy();
    }

    @Override
    public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
        DTXLocalContext.cur().setSysTransactionState(0);
    }

    @Override
    public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) {
        DTXLocalContext.cur().setSysTransactionState(1);
    }

    @Override
    public void postBusinessCode(TxTransactionInfo info) {
        // 這里是通知事務完成
        transactionControlTemplate.notifyGroup(
                info.getGroupId(), info.getUnitId(), info.getTransactionType(),
                DTXLocalContext.transactionState(globalContext.dtxState(info.getGroupId())));
		DTXLocalContext.makeNeverAppeared();
    }
}

創建事務組,向事務服務器發送事務,TransactionControlTemplate通過createGroup方法想事務服務其發送事務,

public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
            throws TransactionException {
        //創建事務組
        try {
            // 日志
            txLogger.txTrace(groupId, unitId,
                    "create group > transaction type: {}", transactionType);
            // 創建事務組訊息,想服務端發送服務請求
            reliableMessenger.createGroup(groupId);
            // 快取發起方切面資訊
            aspectLogger.trace(groupId, unitId, transactionInfo);
        } catch (RpcException e) {
            // 通訊例外
            dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
        } catch (LcnBusinessException e) {
            // 創建事務組業務失敗
            dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
        }
        txLogger.txTrace(groupId, unitId, "create group over");
    }
	
	//向服務器端發送請求創建事務
    @Override
    public void createGroup(String groupId) throws RpcException, LcnBusinessException {
        // TxManager創建事務組
        MessageDto messageDto = request(MessageCreator.createGroup(groupId));
        if (!MessageUtils.statusOk(messageDto)) {
            throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
        }
    }

通知事務完成,向服務器發送請求通知事務完成,這里是由于非事務創建者會有一個執行緒來獲取該狀態進行回滾使用,

@Override
public int notifyGroup(String groupId, int transactionState) throws RpcException, LcnBusinessException {
    NotifyGroupParams notifyGroupParams = new NotifyGroupParams();
    notifyGroupParams.setGroupId(groupId);
    notifyGroupParams.setState(transactionState);
    //具體的發送請求資訊,這里不詳細介紹
    MessageDto messageDto = request0(MessageCreator.notifyGroup(notifyGroupParams),
            clientConfig.getTmRpcTimeout() * clientConfig.getChainLevel());
    // 成功清理發起方事務
    if (!MessageUtils.statusOk(messageDto)) {
        throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
    }
    return messageDto.loadBean(Integer.class);
}

這里是通知事務完成,會向服務器發送一個資訊,來告訴事務服務器事務處理完成,成功了或者失敗了,

@Override
public int notifyGroup(String groupId, int transactionState) throws RpcException, LcnBusinessException {
    NotifyGroupParams notifyGroupParams = new NotifyGroupParams();
    notifyGroupParams.setGroupId(groupId);
    notifyGroupParams.setState(transactionState);
    MessageDto messageDto = request0(MessageCreator.notifyGroup(notifyGroupParams),
            clientConfig.getTmRpcTimeout() * clientConfig.getChainLevel());
    // 成功清理發起方事務
    if (!MessageUtils.statusOk(messageDto)) {
        throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
    }
    return messageDto.loadBean(Integer.class);
}

具體的事務處理是直接呼叫業務代碼,這里就不介紹了,下connect切面DataSourceAspect,業務中每次獲取資料庫連接都通過切面進行處理,

@Around("execution(* javax.sql.DataSource.getConnection(..))")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        return dtxResourceWeaver.getConnection(() -> (Connection) point.proceed());
    }

切面的具體處理是將connection通過TransactionResourceProxy代理類進行處理,然后統一回傳代理類處理邏輯如下,

public Object getConnection(ConnectionCallback connectionCallback) throws Throwable {
   DTXLocalContext dtxLocalContext = DTXLocalContext.cur();
   if (Objects.nonNull(dtxLocalContext) && dtxLocalContext.isProxy()) {
      String transactionType = dtxLocalContext.getTransactionType();
      TransactionResourceProxy resourceProxy = txLcnBeanHelper.loadTransactionResourceProxy(transactionType);
     //這里是創建連接或獲取代理類,并將其存入的集合中(key使用groupId)
      Connection connection = resourceProxy.proxyConnection(connectionCallback);
      if(!connection.isClosed()) {
         if(connection instanceof LcnConnectionProxy) {
            log.debug("proxy a LcnConnectionProxy connection: {}.", ((LcnConnectionProxy)connection).getTarget());
         }
         else {
            log.debug("proxy a sql connection: {}.", connection);
         }
         return connection;
      }
   }
   return connectionCallback.call();
}

這里是資料庫連接類的處理會從globalContext獲取一個連接,如果沒有將新建一個連接然后加入到對應的ConcurrentHashMap中(這里有groupId做key不用擔心存在多執行緒問題),這里是為后面提交和回滾操作做鋪墊,

@Override
public Connection proxyConnection(ConnectionCallback connectionCallback) throws Throwable {
    String groupId = DTXLocalContext.cur().getGroupId();
    try {
      	//通過背景關系獲取資料庫連接
        return globalContext.getLcnConnection(groupId, DynamicDataSourceHolder.getDataSource());
    } catch (TCGlobalContextException e) {
      	//如果不存在連接,直接新建一個連接,將自動提交設定成否,然后加入到ConcurrentHashMap中,這里是為后續操作做鋪墊
        LcnConnectionProxy lcnConnectionProxy = new LcnConnectionProxy(connectionCallback.call());
        globalContext.setLcnConnection(groupId, DynamicDataSourceHolder.getDataSource(), lcnConnectionProxy);
        lcnConnectionProxy.setAutoCommit(false);
        return lcnConnectionProxy;
    }
}

加入事務,這里通過類LcnRunningTransaction進行處理,這里是將preBusinessCode方法中創建事務去掉,然后將onBusinessCodeSuccess的發送事務成功的方法方法改成事務向事務服務器發送加入事務組請求,其內同步添加異步檢測方法,

public class LcnRunningTransaction implements DTXLocalControl {
    
    private final TransactionCleanTemplate transactionCleanTemplate;
    
    private final TransactionControlTemplate transactionControlTemplate;
    
    @Autowired
    public LcnRunningTransaction(TransactionCleanTemplate transactionCleanTemplate,
                                 TransactionControlTemplate transactionControlTemplate) {
        this.transactionCleanTemplate = transactionCleanTemplate;
        this.transactionControlTemplate = transactionControlTemplate;
    }
    
    
    @Override
    public void preBusinessCode(TxTransactionInfo info) {
        DTXLocalContext.makeProxy();
    }
    
    
    @Override
    public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
        try {
          	//呼叫事務的清理方法
            transactionCleanTemplate.clean(info.getGroupId(), info.getUnitId(), info.getTransactionType(), 0);
        } catch (TransactionClearException e) {
            log.error("{} > clean transaction error." , Transactions.LCN);
        }
    }
    
    
    @Override
    public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) throws TransactionException {
        log.debug("join group: [GroupId: {},Method: {}]" , info.getGroupId(),
                info.getTransactionInfo().getMethodStr());
        
        //加入事務組
        transactionControlTemplate.joinGroup(info.getGroupId(), info.getUnitId(), info.getTransactionType(),
                info.getTransactionInfo());
    }
    
}

非事務服務器創建成功后,將其向事務服務器發請求,告訴事務服務器該服務器事務處理完成,將其加入事務組,加入成功后會有一個發起一個執行緒進行監督事務事務是否處理完成,對應的是事務創建服務器中的發送狀態,

public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo)
            throws TransactionException {
        try {
            txLogger.txTrace(groupId, unitId, "join group > transaction type: {}", transactionType);
			//這里將事務加入到事務組
            reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState(globalContext.dtxState(groupId)));

            txLogger.txTrace(groupId, unitId, "join group message over.");

            // 異步檢測,在這里檢測事務是否處理完成
            dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);

            // 快取參與方切面資訊
            aspectLogger.trace(groupId, unitId, transactionInfo);
        } catch (RpcException e) {
            dtxExceptionHandler.handleJoinGroupMessageException(Arrays.asList(groupId, unitId, transactionType), e);
        } catch (LcnBusinessException e) {
            dtxExceptionHandler.handleJoinGroupBusinessException(Arrays.asList(groupId, unitId, transactionType), e);
        }
        txLogger.txTrace(groupId, unitId, "join group logic over");
    }

這里是具體的檢測方法,這里是通過執行緒池進行定時向服務器發送請求,獲取事物的狀態,即事物的創建者是否完成事物的創建,

public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
        txLogger.taskTrace(groupId, unitId, "start delay checking task");
        ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
            try {
                TxContext txContext = globalContext.txContext(groupId);
                if (Objects.nonNull(txContext)) {
                    synchronized (txContext.getLock()) {
                        txLogger.taskTrace(groupId, unitId, "checking waiting for business code finish.");
                        txContext.getLock().wait();
                    }
                }
              	//這里是發送事務請求,查看是否發送成功
                int state = reliableMessenger.askTransactionState(groupId, unitId);
                txLogger.taskTrace(groupId, unitId, "ask transaction state {}", state);
                if (state == -1) {
                    txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
                  	//這里是發送請求后,事務失敗后的操作
                    onAskTransactionStateException(groupId, unitId, transactionType);
                } else {
                  	//這里是事務處理成功的操作
                    transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
                    aspectLogger.clearLog(groupId, unitId);
                }

            } catch (RpcException e) {
                onAskTransactionStateException(groupId, unitId, transactionType);
            } catch (TransactionClearException | InterruptedException e) {
                txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
            }
        }, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);
        delayTasks.put(groupId + unitId, scheduledFuture);
    }

事務成功后操作具體的介面為TransactionCleanService,這里對應的是三個實作類,對應三種事務處理方法,我們的是lcn,通過groupId取到資料源切面的連接代理類,



@Override
    public void clear(String groupId, int state, String unitId, String unitType) throws TransactionClearException {
        try {
        	//這里是獲取資料庫連接操作,跟上面的事務切面相對應,同時groupId取得事務組連接
            Set<LcnConnectionProxy> connectionProxy = globalContext.getLcnConnection(groupId);
            //取得的連接進行提交或者回滾操作
            connectionProxy.forEach(con -> con.notify(state));
            // todo notify exception
        } catch (TCGlobalContextException e) {
            log.warn("Non lcn connection when clear transaction.");
        } finally {
			DTXLocalContext.makeNeverAppeared();
		}
    }

這里是具體的事務成功或者失敗,進行提交和回滾的操作,這里具體的類為LcnConnectionProxy,對應事務的成功操作后的提交和回滾操作,這里的connection適合DataSourceAspect生成的連接是同一個,之前是通過groupId存入ConcurrentHashMap,這里再通過groupId取出對應的資料庫連接,

public RpcResponseState notify(int state) {
        try {
        	if(!connection.isClosed()) {
        		if (state == 1) {
        			log.debug("commit transaction type[lcn] proxy connection:{}.", connection);
        			//事務成功進行提交操作
        			connection.commit();
        		} else {
        			log.debug("rollback transaction type[lcn] proxy connection:{}.", connection);
        			//事務失敗進行回滾操作
        			connection.rollback();
        		}
        		connection.close();
        	}
            log.debug("transaction type[lcn] proxy connection:{} closed.", connection);
            return RpcResponseState.success;
        } catch (Exception e) {
            log.error(e.getLocalizedMessage(), e);
            return RpcResponseState.fail;
        }
    }

這里是事務處理失敗的情況,是將事務發送狀態設定為失敗,然后呼叫上面的clean命令進行回滾,失敗和成功最終都會呼叫clear方法進行事物的處理,失敗釋放在例外中進行處理,但是失敗會分兩種情況,是否創建過connection,創建了就進行回滾,未創建則不需要,這里可能會存在多種情況(由于發生例外的情況不同導致,有些是可以提交的,有些不能)這里不詳細介紹,具體思路即能提交的就調將狀態設定成1,然后clear方法進行提交,不能提交的則將狀態設定成非1,

@Override
    public void handleNotifyGroupBusinessException(Object params, Throwable ex) {
        List paramList = (List) params;
        String groupId = (String) paramList.get(0);
        int state = (int) paramList.get(1);
        String unitId = (String) paramList.get(2);
        String transactionType = (String) paramList.get(3);

        if(null != ex) {
            //用戶強制回滾.
            if (ex instanceof UserRollbackException) {
                state = 0;
            }
            if ((ex.getCause() != null && ex.getCause() instanceof UserRollbackException)) {
                state = 0;
            }
        }
        else {
            // LCN例外
            state = 0;
        }

        // 結束事務
        try {
            transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
        } catch (TransactionClearException e) {
            txLogger.error(groupId, unitId, "notify group", "{} > clean transaction error.", transactionType);
        }
    }

事物的groupId傳遞,Tracings這里通過將請求頭進行處理,這里是通過restTemplate攔截器進行處理(其它方式如dubbo、feign可自行查看原始碼),具體思路是事務創建者將groupId存入請求頭,事務的加入者會根據請求頭來獲取groupId然后根據groupId初始化TracingContext,

public class RestTemplateTracingTransmitter implements ClientHttpRequestInterceptor {

    @Autowired
    public RestTemplateTracingTransmitter(@Autowired(required = false) List<RestTemplate> restTemplates) {
        if (Objects.nonNull(restTemplates)) {
            restTemplates.forEach(restTemplate -> {
                List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
                interceptors.add(interceptors.size(), RestTemplateTracingTransmitter.this);
            });
        }
    }

    @Override
    @NonNull
    public ClientHttpResponse intercept(
            @NonNull HttpRequest httpRequest, @NonNull byte[] bytes,
            @NonNull ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
      	//這里是Lambda運算式寫法httpRequest.getHeaders()::add是對TracingSetter的set方法的實作
        Tracings.transmit(httpRequest.getHeaders()::add);
        return clientHttpRequestExecution.execute(httpRequest, bytes);
    }
}
//這里springMvc的配置類,可以進行具體業務處理前的一些準備作業如解碼
public class SpringTracingApplier implements com.codingapi.txlcn.tracing.http.spring.HandlerInterceptor, WebMvcConfigurer {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
      	//這里是Lambda運算式寫法httpRequest.getHeaders()::add是對TracingGetter的get方法的實作
        Tracings.apply(request::getHeader);
        return true;
    }
	//將攔截器加入到容器
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(this);
    }
}


public class Tracings {
  	//私有構造器,保證了該方法不會被外部直接構建,確保gourpId能夠被正確傳遞
    private Tracings() {
    }

    public static void transmit(TracingSetter tracingSetter) {
      	//請求發起前,將groupId存入報文頭
        if (TracingContext.tracing().hasGroup()) {
            log.debug("tracing transmit group:{}", TracingContext.tracing().groupId());
            tracingSetter.set(TracingConstants.HEADER_KEY_GROUP_ID, TracingContext.tracing().groupId());
            tracingSetter.set(TracingConstants.HEADER_KEY_APP_MAP,									     									Base64Utils.encodeToString(TracingContext.tracing().                                                                           					appMapString().getBytes(StandardCharsets.UTF_8)));
        }
    }

    public static void apply(TracingGetter tracingGetter) {
        String groupId = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_GROUP_ID)).orElse("");
        String appList = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_APP_MAP)).orElse("");
      	//這里和之前的init方法一致,這里是通過獲取報文頭的獲取groupId進行初始化,保證事務創建者的groupId能傳過來
        TracingContext.init(Maps.newHashMap(TracingConstants.GROUP_ID, groupId, TracingConstants.APP_MAP,
                StringUtils.isEmpty(appList) ? appList : new String(Base64Utils.decodeFromString(appList), 					   StandardCharsets.UTF_8)));
        if (TracingContext.tracing().hasGroup()) {
            log.debug("tracing apply group:{}, app map:{}", groupId, appList);
        }
    }

    public interface TracingSetter {
        void set(String key, String value);
    }


    public interface TracingGetter {
        String get(String key);
    }
}

這是lcn分布式事務的處理的具體思路,這里介紹的還是有點缺陷的,事物的補償并未考慮(概率很小),即當事務全部處理完成后,事務服務器收到事務創建者的資訊后,事務服務器或者后面用到的服務器夯機如何處理,也就是創建者服務器的事務提交了,其他服務器的事務提交不了,我這里想法是通過redis對應事務組資訊去進行補償處理,各位小伙伴也可以考慮下如何保證進行 補償或者如何保證事務的強一致性,

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/33101.html

標籤:其他

上一篇:JZ54 字符流中第一個不重復的字符

下一篇:1萬小時定律,各位還差多少小時?

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 網閘典型架構簡述

    網閘架構一般分為兩種:三主機的三系統架構網閘和雙主機的2+1架構網閘。 三主機架構分別為內端機、外端機和仲裁機。三機無論從軟體和硬體上均各自獨立。首先從硬體上來看,三機都用各自獨立的主板、記憶體及存盤設備。從軟體上來看,三機有各自獨立的作業系統。這樣能達到完全的三機獨立。對于“2+1”系統,“2”分為 ......

    uj5u.com 2020-09-10 02:00:44 more
  • 如何從xshell上傳檔案到centos linux虛擬機里

    如何從xshell上傳檔案到centos linux虛擬機里及:虛擬機CentOs下執行 yum -y install lrzsz命令,出現錯誤:鏡像無法找到軟體包 前言 一、安裝lrzsz步驟 二、上傳檔案 三、遇到的問題及解決方案 總結 前言 提示:其實很簡單,往虛擬機上安裝一個上傳檔案的工具 ......

    uj5u.com 2020-09-10 02:00:47 more
  • 一、SQLMAP入門

    一、SQLMAP入門 1、判斷是否存在注入 sqlmap.py -u 網址/id=1 id=1不可缺少。當注入點后面的引數大于兩個時。需要加雙引號, sqlmap.py -u "網址/id=1&uid=1" 2、判斷文本中的請求是否存在注入 從文本中加載http請求,SQLMAP可以從一個文本檔案中 ......

    uj5u.com 2020-09-10 02:00:50 more
  • Metasploit 簡單使用教程

    metasploit 簡單使用教程 浩先生, 2020-08-28 16:18:25 分類專欄: kail 網路安全 linux 文章標簽: linux資訊安全 編輯 著作權 metasploit 使用教程 前言 一、Metasploit是什么? 二、準備作業 三、具體步驟 前言 Msfconsole ......

    uj5u.com 2020-09-10 02:00:53 more
  • 游戲逆向之驅動層與用戶層通訊

    驅動層代碼: #pragma once #include <ntifs.h> #define add_code CTL_CODE(FILE_DEVICE_UNKNOWN,0x800,METHOD_BUFFERED,FILE_ANY_ACCESS) /* 更多游戲逆向視頻www.yxfzedu.com ......

    uj5u.com 2020-09-10 02:00:56 more
  • 北斗電力時鐘(北斗授時服務器)讓網路資料更精準

    北斗電力時鐘(北斗授時服務器)讓網路資料更精準 北斗電力時鐘(北斗授時服務器)讓網路資料更精準 京準電子科技官微——ahjzsz 近幾年,資訊技術的得了快速發展,互聯網在逐漸普及,其在人們生活和生產中都得到了廣泛應用,并且取得了不錯的應用效果。計算機網路資訊在電力系統中的應用,一方面使電力系統的運行 ......

    uj5u.com 2020-09-10 02:01:03 more
  • 【CTF】CTFHub 技能樹 彩蛋 writeup

    ?碎碎念 CTFHub:https://www.ctfhub.com/ 筆者入門CTF時時剛開始刷的是bugku的舊平臺,后來才有了CTFHub。 感覺不論是網頁UI設計,還是題目質量,賽事跟蹤,工具軟體都做得很不錯。 而且因為獨到的金幣制度的確讓人有一種想去刷題賺金幣的感覺。 個人還是非常喜歡這個 ......

    uj5u.com 2020-09-10 02:04:05 more
  • 02windows基礎操作

    我學到了一下幾點 Windows系統目錄結構與滲透的作用 常見Windows的服務詳解 Windows埠詳解 常用的Windows注冊表詳解 hacker DOS命令詳解(net user / type /md /rd/ dir /cd /net use copy、批處理 等) 利用dos命令制作 ......

    uj5u.com 2020-09-10 02:04:18 more
  • 03.Linux基礎操作

    我學到了以下幾點 01Linux系統介紹02系統安裝,密碼啊破解03Linux常用命令04LAMP 01LINUX windows: win03 8 12 16 19 配置不繁瑣 Linux:redhat,centos(紅帽社區版),Ubuntu server,suse unix:金融機構,證券,銀 ......

    uj5u.com 2020-09-10 02:04:30 more
  • 05HTML

    01HTML介紹 02頭部標簽講解03基礎標簽講解04表單標簽講解 HTML前段語言 js1.了解代碼2.根據代碼 懂得挖掘漏洞 (POST注入/XSS漏洞上傳)3.黑帽seo 白帽seo 客戶網站被黑帽植入劫持代碼如何處理4.熟悉html表單 <html><head><title>TDK標題,描述 ......

    uj5u.com 2020-09-10 02:04:36 more
最新发布
  • 2023年最新微信小程式抓包教程

    01 開門見山 隔一個月發一篇文章,不過分。 首先回顧一下《微信系結手機號資料庫被脫庫事件》,我也是第一時間得知了這個訊息,然后跟蹤了整件事情的經過。下面是這起事件的相關截圖以及近日流出的一萬條資料樣本: 個人認為這件事也沒什么,還不如關注一下之前45億快遞資料查詢渠道疑似在近日復活的訊息。 訊息是 ......

    uj5u.com 2023-04-20 08:48:24 more
  • web3 產品介紹:metamask 錢包 使用最多的瀏覽器插件錢包

    Metamask錢包是一種基于區塊鏈技術的數字貨幣錢包,它允許用戶在安全、便捷的環境下管理自己的加密資產。Metamask錢包是以太坊生態系統中最流行的錢包之一,它具有易于使用、安全性高和功能強大等優點。 本文將詳細介紹Metamask錢包的功能和使用方法。 一、 Metamask錢包的功能 數字資 ......

    uj5u.com 2023-04-20 08:47:46 more
  • vulnhub_Earth

    前言 靶機地址->>>vulnhub_Earth 攻擊機ip:192.168.20.121 靶機ip:192.168.20.122 參考文章 https://www.cnblogs.com/Jing-X/archive/2022/04/03/16097695.html https://www.cnb ......

    uj5u.com 2023-04-20 07:46:20 more
  • 從4k到42k,軟體測驗工程師的漲薪史,給我看哭了

    清明節一過,盲猜大家已經無心上班,在數著日子準備過五一,但一想到銀行卡里的余額……瞬間心情就不美麗了。最近,2023年高校畢業生就業調查顯示,本科畢業月平均起薪為5825元。調查一出,便有很多同學表示自己又被平均了。看著這一資料,不免讓人想到前不久中國青年報的一項調查:近六成大學生認為畢業10年內會 ......

    uj5u.com 2023-04-20 07:44:00 more
  • 最新版本 Stable Diffusion 開源 AI 繪畫工具之中文自動提詞篇

    🎈 標簽生成器 由于輸入正向提示詞 prompt 和反向提示詞 negative prompt 都是使用英文,所以對學習母語的我們非常不友好 使用網址:https://tinygeeker.github.io/p/ai-prompt-generator 這個網址是為了讓大家在使用 AI 繪畫的時候 ......

    uj5u.com 2023-04-20 07:43:36 more
  • 漫談前端自動化測驗演進之路及測驗工具分析

    隨著前端技術的不斷發展和應用程式的日益復雜,前端自動化測驗也在不斷演進。隨著 Web 應用程式變得越來越復雜,自動化測驗的需求也越來越高。如今,自動化測驗已經成為 Web 應用程式開發程序中不可或缺的一部分,它們可以幫助開發人員更快地發現和修復錯誤,提高應用程式的性能和可靠性。 ......

    uj5u.com 2023-04-20 07:43:16 more
  • CANN開發實踐:4個DVPP記憶體問題的典型案例解讀

    摘要:由于DVPP媒體資料處理功能對存放輸入、輸出資料的記憶體有更高的要求(例如,記憶體首地址128位元組對齊),因此需呼叫專用的記憶體申請介面,那么本期就分享幾個關于DVPP記憶體問題的典型案例,并給出原因分析及解決方法。 本文分享自華為云社區《FAQ_DVPP記憶體問題案例》,作者:昇騰CANN。 DVPP ......

    uj5u.com 2023-04-20 07:43:03 more
  • msf學習

    msf學習 以kali自帶的msf為例 一、msf核心模塊與功能 msf模塊都放在/usr/share/metasploit-framework/modules目錄下 1、auxiliary 輔助模塊,輔助滲透(埠掃描、登錄密碼爆破、漏洞驗證等) 2、encoders 編碼器模塊,主要包含各種編碼 ......

    uj5u.com 2023-04-20 07:42:59 more
  • Halcon軟體安裝與界面簡介

    1. 下載Halcon17版本到到本地 2. 雙擊安裝包后 3. 步驟如下 1.2 Halcon軟體安裝 界面分為四大塊 1. Halcon的五個助手 1) 影像采集助手:與相機連接,設定相機引數,采集影像 2) 標定助手:九點標定或是其它的標定,生成標定檔案及內參外參,可以將像素單位轉換為長度單位 ......

    uj5u.com 2023-04-20 07:42:17 more
  • 在MacOS下使用Unity3D開發游戲

    第一次發博客,先發一下我的游戲開發環境吧。 去年2月份買了一臺MacBookPro2021 M1pro(以下簡稱mbp),這一年來一直在用mbp開發游戲。我大致分享一下我的開發工具以及使用體驗。 1、Unity 官網鏈接: https://unity.cn/releases 我一般使用的Apple ......

    uj5u.com 2023-04-20 07:40:19 more