主頁 > 後端開發 > 跟著原始碼看lcn分布式事務

跟著原始碼看lcn分布式事務

2020-09-14 03:00:10 後端開發

跟著原始碼看lcn分布式事務

lcn分布式事務具體實作思路是服務器A創建事務,構建事務資訊并將事務資訊發送到事務處理器,處理程序中可能用到服務器B、C,會將事務組Id傳給B、C,B、C業務處理完成后將事務資訊加入到對應的事務組,并且創建一個執行緒對事務組狀態進行檢測A事務處理完成后向服務其發送的事務狀態,B、C會根據監督的狀態判斷對事務進行具體的提交或者回滾操作,

分布式事務流程圖

在這里插入圖片描述

lcn分布式事務是通過切面進行實作,lcn內部共有兩個aop切面,分別是DataSourceAspect和TransactionAspect,DataSourceAspect攔截對應的資料庫連接(connection),TransactionAspect進行具體的分布式事務的實作,這里事務的開始是通過TransactionAspect進行開始,這里僅截取lcn部分代碼去介紹,

private final DTXLogicWeaver dtxLogicWeaver;

@Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)")
public void lcnTransactionPointcut() {
}    

@Around("lcnTransactionPointcut() && !txcTransactionPointcut()" +
        "&& !tccTransactionPointcut() && !txTransactionPointcut()")
public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
    DTXInfo dtxInfo = DTXInfo.getFromCache(point);
    LcnTransaction lcnTransaction = dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
    dtxInfo.setTransactionType(Transactions.LCN);
    dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
    return dtxLogicWeaver.runTransaction(dtxInfo, point::proceed);
}

事務攔截器通過攔截帶有LcnTransaction注解的方法后進行攔截處理,然后啟動事務runTransaction方法啟動具體的分布式事務實作類,

public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {

    log.debug("<---- TxLcn start ---->");
  	//這里獲取dtxLocalContext,創建事務時新建,每個事務之只能有一個存在
    DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
    TxContext txContext;
    // ---------- 保證每個模塊在一個DTX下只會有一個TxContext ---------- //
  	//判斷是否存在事務資訊,沒有啟動事務,有加入到事務
    if (globalContext.hasTxContext()) {
        // 有事務背景關系的獲取事務背景關系
        txContext = globalContext.txContext();
        dtxLocalContext.setInGroup(true);
        log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
        // 本地事務呼叫
        if (Objects.nonNull(dtxLocalContext.getGroupId())) {
            dtxLocalContext.setDestroy(false);
        }
    } else {
        // 沒有的開啟本地事務背景關系
        txContext = globalContext.startTx();
        dtxLocalContext.setInGroup(false);
    }

    dtxLocalContext.setUnitId(dtxInfo.getUnitId());
    dtxLocalContext.setGroupId(txContext.getGroupId());
    dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());

    // 事務引數
    TxTransactionInfo info = new TxTransactionInfo();
    info.setBusinessCallback(business);
    info.setGroupId(txContext.getGroupId());
    info.setUnitId(dtxInfo.getUnitId());
    info.setPointMethod(dtxInfo.getBusinessMethod());
    info.setPropagation(dtxInfo.getTransactionPropagation());
    info.setTransactionInfo(dtxInfo.getTransactionInfo());
    info.setTransactionType(dtxInfo.getTransactionType());
    info.setTransactionStart(txContext.isDtxStart());

    //LCN事務處理器
    try {
        return transactionServiceExecutor.transactionRunning(info);
    } finally {
        // 執行緒執行業務完畢清理本地資料
        if (dtxLocalContext.isDestroy()) {
            // 通知事務執行完畢
            synchronized (txContext.getLock()) {
                txContext.getLock().notifyAll();
            }

            // TxContext生命周期是? 和事務組一樣(不與具體模塊相關的)
            if (!dtxLocalContext.isInGroup()) {
                globalContext.destroyTx();
            }
        }
        if(info.isTransactionStart() && info.getPropagation().equals(DTXPropagation.REQUIRES_NEW)) {
            DTXLocalContext.makeNeverAppeared();
            TracingContext.tracing().destroy();
        }
        log.debug("<---- TxLcn end ---->");
    }
}

這里會存在一個unitId代表每個服務的唯一性代碼,如果存在直接回傳,如果不存在則新建,是從DTXInfo中獲取,通過Transactions的unitId方式生成(具體代碼放在了業務代碼里,不方便貼出來),

//根據方法名生成unitId然后去dtxInfoCache中獲取,存在直接回傳,不存在存入快取中然后回傳
public static DTXInfo getFromCache(MethodInvocation methodInvocation) {
    String signature = methodInvocation.getMethod().toString();
    String unitId = Transactions.unitId(signature);
    DTXInfo dtxInfo = dtxInfoCache.get(unitId);
    if (Objects.isNull(dtxInfo)) {
        dtxInfo = new DTXInfo(methodInvocation.getMethod(),
                methodInvocation.getArguments(), methodInvocation.getThis().getClass());
        dtxInfoCache.put(unitId, dtxInfo);
    }
    dtxInfo.reanalyseMethodArgs(methodInvocation.getArguments());
    return dtxInfo;
}
//md5根據方法的signature進行計算得到
public static String unitId(String methodSignature) {
    return DigestUtils.md5DigestAsHex((APPLICATION_ID_WHEN_RUNNING + methodSignature).getBytes());
}

DTXLocalContext.getOrNew(),會獲取一個本地事務背景關系資訊,由于同一個事務可能多次用到,這里使用的時getOrNew當第一次使用使用直接新建,并保存在ThreadLocal變數currentLocal里,否則回傳已經存在的,這里是為了保證每個請求對應獲取到的是同一個類,避免并發問題引起分布式事務問題,

private final static ThreadLocal<Map<String, DTXLocalContext>> currentLocal = new InheritableThreadLocal<>();

public static DTXLocalContext getOrNew() {
    //這里是dataSource的名稱,可能是動態資料源,系統不同獲取就不同,這里把本系統的代碼去掉了
    String dataSourceName = "dataSource";
    if (currentLocal.get() == null) {
      	//存在背景關系資訊,當時不存在對應的dataSource的名稱,針對多資料源
        currentLocal.set(new ConcurrentHashMap<String, DTXLocalContext>());
    }
  	//首先獲取背景關系資訊
    DTXLocalContext dtxLocalContext = currentLocal.get().get(dataSourceName);
    if(null == dtxLocalContext) {
      	// 不存在直接新建事務背景關系并保存到本地集合
        dtxLocalContext = new DTXLocalContext();
        currentLocal.get().put(dataSourceName, dtxLocalContext);
    }
    //回傳背景關系資訊
    return dtxLocalContext;
}

globalContext.startTx(),啟動事務生成一個groupId然后將其存入ThreadLocal中,

public TxContext startTx() {
    TxContext txContext = new TxContext();
    // 事務發起方判斷,這里判斷是否有事務根據事務組id(groupId)進行判斷
    txContext.setDtxStart(!TracingContext.tracing().hasGroup());
    if (txContext.isDtxStart()) {
      	//沒有啟動事務則啟動事務
        TracingContext.tracing().beginTransactionGroup();
    }
    txContext.setGroupId(TracingContext.tracing().groupId());
    String txContextKey = txContext.getGroupId() + ".dtx";
    attachmentCache.attach(txContextKey, txContext);
    log.debug("Start TxContext[{}]", txContext.getGroupId());
    return txContext;
}

新建事務時會存在一個groupId代表整個事務唯一id,當事務不存在的情況下(根據groupId進行判斷,tracingContextThreadLocal中不存在groupId則不存在,否則就是存在,不存在的初始化一個,會隨機生成一個groupId,在tracingContextThreadLocal中加入初始化的事務資訊)

private static ThreadLocal<TracingContext> tracingContextThreadLocal = new ThreadLocal<>();

public static TracingContext tracing() {
    if (tracingContextThreadLocal.get() == null) {
        tracingContextThreadLocal.set(new TracingContext());
    }
    return tracingContextThreadLocal.get();
}
    

public void beginTransactionGroup() {
    if (hasGroup()) {
        return;
    }
  	//不存在事務組的情況下,初始化一個,gourpId是一個亂數
    init(Maps.newHashMap(TracingConstants.GROUP_ID, RandomUtils.randomKey(), TracingConstants.APP_MAP, "{}"));
}

public static void init(Map<String, String> initFields) {
    // 將生成的TracingContext資訊加入到tracingContextThreadLocal
    if (Objects.isNull(initFields)) {
        log.warn("init tracingContext fail. null init fields.");
        return;
    }
    TracingContext tracingContext = tracing();
    if (Objects.isNull(tracingContext.fields)) {
        tracingContext.fields = new HashMap<>();
    }
    tracingContext.fields.putAll(initFields);
}

具體處理邏輯在這里,這里會有事務的傳播行為處理propagationResolver.resolvePropagationState(info),事務處理前操作(向服務端發送請求,將事務加入到事務組)dtxLocalControl.preBusinessCode(info)、具體的業務處理dtxLocalControl.doBusinessCode(info)、業務成功后的操作dtxLocalControl.onBusinessCodeSuccess(info, result)、業務執行失敗操作dtxLocalControl.onBusinessCodeError(info, e)、業務結束操作dtxLocalControl.postBusinessCode(info),

public Object transactionRunning(TxTransactionInfo info) throws Throwable {

        // 1. 獲取事務型別
        String transactionType = info.getTransactionType();

        // 2. 獲取事務傳播狀態
        DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);

        // 2.1 如果不參與分布式事務立即終止
        if (propagationState.isIgnored()) {
            return info.getBusinessCallback().call();
        }

        // 3. 獲取本地分布式事務控制器
        DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);

        // 4. 織入事務操作
        try {
            // 4.1 記錄事務型別到事務背景關系
            Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
            transactionTypeSet.add(transactionType);

            dtxLocalControl.preBusinessCode(info);

            // 4.2 業務執行前
            txLogger.txTrace(
                    info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);

            // 4.3 執行業務
            Object result = dtxLocalControl.doBusinessCode(info);

            // 4.4 業務執行成功
            txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
            dtxLocalControl.onBusinessCodeSuccess(info, result);
            return result;
        } catch (TransactionException e) {
            txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
            throw e;
        } catch (Throwable e) {
            // 4.5 業務執行失敗
            txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
                    "business code error");
            dtxLocalControl.onBusinessCodeError(info, e);
            throw e;
        } finally {
            // 4.6 業務執行完畢
            dtxLocalControl.postBusinessCode(info);
        }
    }

傳播行為處理操作,具體處理是根據傳遞的屬性判斷、將事務加入到事務組、新建事務或直接不加入事務,這里不介紹具體的傳播行為,不理解的可以執行查找了解下,

public DTXPropagationState resolvePropagationState(TxTransactionInfo txTransactionInfo) throws TransactionException {

        // 本地已在DTX,根據事務傳播,靜默加入
        if (DTXLocalContext.cur().isInGroup()) {
            log.debug("SILENT_JOIN group!");
            return DTXPropagationState.SILENT_JOIN;
        }

        // 發起方之前沒有事務
        if (txTransactionInfo.isTransactionStart()) {
            // 根據事務傳播,對于 SUPPORTS 不參與事務
            if (DTXPropagation.SUPPORTS.equals(txTransactionInfo.getPropagation())) {
                return DTXPropagationState.NON;
            }
            // 根據事務傳播,創建事務
            return DTXPropagationState.CREATE;
        }

        // 已經存在DTX,根據事務傳播,加入
        return DTXPropagationState.JOIN;
    }

業務處理前操作dtxLocalControl.preBusinessCode(info),這里dtxLocalControl有兩個實作類,分別對應創建事務LcnStartingTransaction和加入事務LcnRunningTransaction,這里使用的是創建事務,

public class LcnStartingTransaction implements DTXLocalControl {

    private final TransactionControlTemplate transactionControlTemplate;

    private final TCGlobalContext globalContext;


@Autowired
public LcnStartingTransaction(TransactionControlTemplate transactionControlTemplate, TCGlobalContext globalContext) {
 	this.transactionControlTemplate = transactionControlTemplate;
    this.globalContext = globalContext;
}

    @Override
    public void preBusinessCode(TxTransactionInfo info) throws TransactionException {
        // 創建事務操作
        transactionControlTemplate.createGroup(
                info.getGroupId(), info.getUnitId(), info.getTransactionInfo(), info.getTransactionType());
        DTXLocalContext.makeProxy();
    }

    @Override
    public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
        DTXLocalContext.cur().setSysTransactionState(0);
    }

    @Override
    public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) {
        DTXLocalContext.cur().setSysTransactionState(1);
    }

    @Override
    public void postBusinessCode(TxTransactionInfo info) {
        // 這里是通知事務完成
        transactionControlTemplate.notifyGroup(
                info.getGroupId(), info.getUnitId(), info.getTransactionType(),
                DTXLocalContext.transactionState(globalContext.dtxState(info.getGroupId())));
		DTXLocalContext.makeNeverAppeared();
    }
}

創建事務組,向事務服務器發送事務,TransactionControlTemplate通過createGroup方法想事務服務其發送事務,

public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
            throws TransactionException {
        //創建事務組
        try {
            // 日志
            txLogger.txTrace(groupId, unitId,
                    "create group > transaction type: {}", transactionType);
            // 創建事務組訊息,想服務端發送服務請求
            reliableMessenger.createGroup(groupId);
            // 快取發起方切面資訊
            aspectLogger.trace(groupId, unitId, transactionInfo);
        } catch (RpcException e) {
            // 通訊例外
            dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
        } catch (LcnBusinessException e) {
            // 創建事務組業務失敗
            dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
        }
        txLogger.txTrace(groupId, unitId, "create group over");
    }
	
	//向服務器端發送請求創建事務
    @Override
    public void createGroup(String groupId) throws RpcException, LcnBusinessException {
        // TxManager創建事務組
        MessageDto messageDto = request(MessageCreator.createGroup(groupId));
        if (!MessageUtils.statusOk(messageDto)) {
            throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
        }
    }

通知事務完成,向服務器發送請求通知事務完成,這里是由于非事務創建者會有一個執行緒來獲取該狀態進行回滾使用,

@Override
public int notifyGroup(String groupId, int transactionState) throws RpcException, LcnBusinessException {
    NotifyGroupParams notifyGroupParams = new NotifyGroupParams();
    notifyGroupParams.setGroupId(groupId);
    notifyGroupParams.setState(transactionState);
    //具體的發送請求資訊,這里不詳細介紹
    MessageDto messageDto = request0(MessageCreator.notifyGroup(notifyGroupParams),
            clientConfig.getTmRpcTimeout() * clientConfig.getChainLevel());
    // 成功清理發起方事務
    if (!MessageUtils.statusOk(messageDto)) {
        throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
    }
    return messageDto.loadBean(Integer.class);
}

這里是通知事務完成,會向服務器發送一個資訊,來告訴事務服務器事務處理完成,成功了或者失敗了,

@Override
public int notifyGroup(String groupId, int transactionState) throws RpcException, LcnBusinessException {
    NotifyGroupParams notifyGroupParams = new NotifyGroupParams();
    notifyGroupParams.setGroupId(groupId);
    notifyGroupParams.setState(transactionState);
    MessageDto messageDto = request0(MessageCreator.notifyGroup(notifyGroupParams),
            clientConfig.getTmRpcTimeout() * clientConfig.getChainLevel());
    // 成功清理發起方事務
    if (!MessageUtils.statusOk(messageDto)) {
        throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
    }
    return messageDto.loadBean(Integer.class);
}

具體的事務處理是直接呼叫業務代碼,這里就不介紹了,下connect切面DataSourceAspect,業務中每次獲取資料庫連接都通過切面進行處理,

@Around("execution(* javax.sql.DataSource.getConnection(..))")
    public Object around(ProceedingJoinPoint point) throws Throwable {
        return dtxResourceWeaver.getConnection(() -> (Connection) point.proceed());
    }

切面的具體處理是將connection通過TransactionResourceProxy代理類進行處理,然后統一回傳代理類處理邏輯如下,

public Object getConnection(ConnectionCallback connectionCallback) throws Throwable {
   DTXLocalContext dtxLocalContext = DTXLocalContext.cur();
   if (Objects.nonNull(dtxLocalContext) && dtxLocalContext.isProxy()) {
      String transactionType = dtxLocalContext.getTransactionType();
      TransactionResourceProxy resourceProxy = txLcnBeanHelper.loadTransactionResourceProxy(transactionType);
     //這里是創建連接或獲取代理類,并將其存入的集合中(key使用groupId)
      Connection connection = resourceProxy.proxyConnection(connectionCallback);
      if(!connection.isClosed()) {
         if(connection instanceof LcnConnectionProxy) {
            log.debug("proxy a LcnConnectionProxy connection: {}.", ((LcnConnectionProxy)connection).getTarget());
         }
         else {
            log.debug("proxy a sql connection: {}.", connection);
         }
         return connection;
      }
   }
   return connectionCallback.call();
}

這里是資料庫連接類的處理會從globalContext獲取一個連接,如果沒有將新建一個連接然后加入到對應的ConcurrentHashMap中(這里有groupId做key不用擔心存在多執行緒問題),這里是為后面提交和回滾操作做鋪墊,

@Override
public Connection proxyConnection(ConnectionCallback connectionCallback) throws Throwable {
    String groupId = DTXLocalContext.cur().getGroupId();
    try {
      	//通過背景關系獲取資料庫連接
        return globalContext.getLcnConnection(groupId, DynamicDataSourceHolder.getDataSource());
    } catch (TCGlobalContextException e) {
      	//如果不存在連接,直接新建一個連接,將自動提交設定成否,然后加入到ConcurrentHashMap中,這里是為后續操作做鋪墊
        LcnConnectionProxy lcnConnectionProxy = new LcnConnectionProxy(connectionCallback.call());
        globalContext.setLcnConnection(groupId, DynamicDataSourceHolder.getDataSource(), lcnConnectionProxy);
        lcnConnectionProxy.setAutoCommit(false);
        return lcnConnectionProxy;
    }
}

加入事務,這里通過類LcnRunningTransaction進行處理,這里是將preBusinessCode方法中創建事務去掉,然后將onBusinessCodeSuccess的發送事務成功的方法方法改成事務向事務服務器發送加入事務組請求,其內同步添加異步檢測方法,

public class LcnRunningTransaction implements DTXLocalControl {
    
    private final TransactionCleanTemplate transactionCleanTemplate;
    
    private final TransactionControlTemplate transactionControlTemplate;
    
    @Autowired
    public LcnRunningTransaction(TransactionCleanTemplate transactionCleanTemplate,
                                 TransactionControlTemplate transactionControlTemplate) {
        this.transactionCleanTemplate = transactionCleanTemplate;
        this.transactionControlTemplate = transactionControlTemplate;
    }
    
    
    @Override
    public void preBusinessCode(TxTransactionInfo info) {
        DTXLocalContext.makeProxy();
    }
    
    
    @Override
    public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
        try {
          	//呼叫事務的清理方法
            transactionCleanTemplate.clean(info.getGroupId(), info.getUnitId(), info.getTransactionType(), 0);
        } catch (TransactionClearException e) {
            log.error("{} > clean transaction error." , Transactions.LCN);
        }
    }
    
    
    @Override
    public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) throws TransactionException {
        log.debug("join group: [GroupId: {},Method: {}]" , info.getGroupId(),
                info.getTransactionInfo().getMethodStr());
        
        //加入事務組
        transactionControlTemplate.joinGroup(info.getGroupId(), info.getUnitId(), info.getTransactionType(),
                info.getTransactionInfo());
    }
    
}

非事務服務器創建成功后,將其向事務服務器發請求,告訴事務服務器該服務器事務處理完成,將其加入事務組,加入成功后會有一個發起一個執行緒進行監督事務事務是否處理完成,對應的是事務創建服務器中的發送狀態,

public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo)
            throws TransactionException {
        try {
            txLogger.txTrace(groupId, unitId, "join group > transaction type: {}", transactionType);
			//這里將事務加入到事務組
            reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState(globalContext.dtxState(groupId)));

            txLogger.txTrace(groupId, unitId, "join group message over.");

            // 異步檢測,在這里檢測事務是否處理完成
            dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);

            // 快取參與方切面資訊
            aspectLogger.trace(groupId, unitId, transactionInfo);
        } catch (RpcException e) {
            dtxExceptionHandler.handleJoinGroupMessageException(Arrays.asList(groupId, unitId, transactionType), e);
        } catch (LcnBusinessException e) {
            dtxExceptionHandler.handleJoinGroupBusinessException(Arrays.asList(groupId, unitId, transactionType), e);
        }
        txLogger.txTrace(groupId, unitId, "join group logic over");
    }

這里是具體的檢測方法,這里是通過執行緒池進行定時向服務器發送請求,獲取事物的狀態,即事物的創建者是否完成事物的創建,

public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
        txLogger.taskTrace(groupId, unitId, "start delay checking task");
        ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
            try {
                TxContext txContext = globalContext.txContext(groupId);
                if (Objects.nonNull(txContext)) {
                    synchronized (txContext.getLock()) {
                        txLogger.taskTrace(groupId, unitId, "checking waiting for business code finish.");
                        txContext.getLock().wait();
                    }
                }
              	//這里是發送事務請求,查看是否發送成功
                int state = reliableMessenger.askTransactionState(groupId, unitId);
                txLogger.taskTrace(groupId, unitId, "ask transaction state {}", state);
                if (state == -1) {
                    txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
                  	//這里是發送請求后,事務失敗后的操作
                    onAskTransactionStateException(groupId, unitId, transactionType);
                } else {
                  	//這里是事務處理成功的操作
                    transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
                    aspectLogger.clearLog(groupId, unitId);
                }

            } catch (RpcException e) {
                onAskTransactionStateException(groupId, unitId, transactionType);
            } catch (TransactionClearException | InterruptedException e) {
                txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
            }
        }, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);
        delayTasks.put(groupId + unitId, scheduledFuture);
    }

事務成功后操作具體的介面為TransactionCleanService,這里對應的是三個實作類,對應三種事務處理方法,我們的是lcn,通過groupId取到資料源切面的連接代理類,



@Override
    public void clear(String groupId, int state, String unitId, String unitType) throws TransactionClearException {
        try {
        	//這里是獲取資料庫連接操作,跟上面的事務切面相對應,同時groupId取得事務組連接
            Set<LcnConnectionProxy> connectionProxy = globalContext.getLcnConnection(groupId);
            //取得的連接進行提交或者回滾操作
            connectionProxy.forEach(con -> con.notify(state));
            // todo notify exception
        } catch (TCGlobalContextException e) {
            log.warn("Non lcn connection when clear transaction.");
        } finally {
			DTXLocalContext.makeNeverAppeared();
		}
    }

這里是具體的事務成功或者失敗,進行提交和回滾的操作,這里具體的類為LcnConnectionProxy,對應事務的成功操作后的提交和回滾操作,這里的connection適合DataSourceAspect生成的連接是同一個,之前是通過groupId存入ConcurrentHashMap,這里再通過groupId取出對應的資料庫連接,

public RpcResponseState notify(int state) {
        try {
        	if(!connection.isClosed()) {
        		if (state == 1) {
        			log.debug("commit transaction type[lcn] proxy connection:{}.", connection);
        			//事務成功進行提交操作
        			connection.commit();
        		} else {
        			log.debug("rollback transaction type[lcn] proxy connection:{}.", connection);
        			//事務失敗進行回滾操作
        			connection.rollback();
        		}
        		connection.close();
        	}
            log.debug("transaction type[lcn] proxy connection:{} closed.", connection);
            return RpcResponseState.success;
        } catch (Exception e) {
            log.error(e.getLocalizedMessage(), e);
            return RpcResponseState.fail;
        }
    }

這里是事務處理失敗的情況,是將事務發送狀態設定為失敗,然后呼叫上面的clean命令進行回滾,失敗和成功最終都會呼叫clear方法進行事物的處理,失敗釋放在例外中進行處理,但是失敗會分兩種情況,是否創建過connection,創建了就進行回滾,未創建則不需要,這里可能會存在多種情況(由于發生例外的情況不同導致,有些是可以提交的,有些不能)這里不詳細介紹,具體思路即能提交的就調將狀態設定成1,然后clear方法進行提交,不能提交的則將狀態設定成非1,

@Override
    public void handleNotifyGroupBusinessException(Object params, Throwable ex) {
        List paramList = (List) params;
        String groupId = (String) paramList.get(0);
        int state = (int) paramList.get(1);
        String unitId = (String) paramList.get(2);
        String transactionType = (String) paramList.get(3);

        if(null != ex) {
            //用戶強制回滾.
            if (ex instanceof UserRollbackException) {
                state = 0;
            }
            if ((ex.getCause() != null && ex.getCause() instanceof UserRollbackException)) {
                state = 0;
            }
        }
        else {
            // LCN例外
            state = 0;
        }

        // 結束事務
        try {
            transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
        } catch (TransactionClearException e) {
            txLogger.error(groupId, unitId, "notify group", "{} > clean transaction error.", transactionType);
        }
    }

事物的groupId傳遞,Tracings這里通過將請求頭進行處理,這里是通過restTemplate攔截器進行處理(其它方式如dubbo、feign可自行查看原始碼),具體思路是事務創建者將groupId存入請求頭,事務的加入者會根據請求頭來獲取groupId然后根據groupId初始化TracingContext,

public class RestTemplateTracingTransmitter implements ClientHttpRequestInterceptor {

    @Autowired
    public RestTemplateTracingTransmitter(@Autowired(required = false) List<RestTemplate> restTemplates) {
        if (Objects.nonNull(restTemplates)) {
            restTemplates.forEach(restTemplate -> {
                List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
                interceptors.add(interceptors.size(), RestTemplateTracingTransmitter.this);
            });
        }
    }

    @Override
    @NonNull
    public ClientHttpResponse intercept(
            @NonNull HttpRequest httpRequest, @NonNull byte[] bytes,
            @NonNull ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
      	//這里是Lambda運算式寫法httpRequest.getHeaders()::add是對TracingSetter的set方法的實作
        Tracings.transmit(httpRequest.getHeaders()::add);
        return clientHttpRequestExecution.execute(httpRequest, bytes);
    }
}
//這里springMvc的配置類,可以進行具體業務處理前的一些準備作業如解碼
public class SpringTracingApplier implements com.codingapi.txlcn.tracing.http.spring.HandlerInterceptor, WebMvcConfigurer {

    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
      	//這里是Lambda運算式寫法httpRequest.getHeaders()::add是對TracingGetter的get方法的實作
        Tracings.apply(request::getHeader);
        return true;
    }
	//將攔截器加入到容器
    @Override
    public void addInterceptors(InterceptorRegistry registry) {
        registry.addInterceptor(this);
    }
}


public class Tracings {
  	//私有構造器,保證了該方法不會被外部直接構建,確保gourpId能夠被正確傳遞
    private Tracings() {
    }

    public static void transmit(TracingSetter tracingSetter) {
      	//請求發起前,將groupId存入報文頭
        if (TracingContext.tracing().hasGroup()) {
            log.debug("tracing transmit group:{}", TracingContext.tracing().groupId());
            tracingSetter.set(TracingConstants.HEADER_KEY_GROUP_ID, TracingContext.tracing().groupId());
            tracingSetter.set(TracingConstants.HEADER_KEY_APP_MAP,									     									Base64Utils.encodeToString(TracingContext.tracing().                                                                           					appMapString().getBytes(StandardCharsets.UTF_8)));
        }
    }

    public static void apply(TracingGetter tracingGetter) {
        String groupId = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_GROUP_ID)).orElse("");
        String appList = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_APP_MAP)).orElse("");
      	//這里和之前的init方法一致,這里是通過獲取報文頭的獲取groupId進行初始化,保證事務創建者的groupId能傳過來
        TracingContext.init(Maps.newHashMap(TracingConstants.GROUP_ID, groupId, TracingConstants.APP_MAP,
                StringUtils.isEmpty(appList) ? appList : new String(Base64Utils.decodeFromString(appList), 					   StandardCharsets.UTF_8)));
        if (TracingContext.tracing().hasGroup()) {
            log.debug("tracing apply group:{}, app map:{}", groupId, appList);
        }
    }

    public interface TracingSetter {
        void set(String key, String value);
    }


    public interface TracingGetter {
        String get(String key);
    }
}

這是lcn分布式事務的處理的具體思路,這里介紹的還是有點缺陷的,事物的補償并未考慮(概率很小),即當事務全部處理完成后,事務服務器收到事務創建者的資訊后,事務服務器或者后面用到的服務器夯機如何處理,也就是創建者服務器的事務提交了,其他服務器的事務提交不了,我這里想法是通過redis對應事務組資訊去進行補償處理,各位小伙伴也可以考慮下如何保證進行 補償或者如何保證事務的強一致性,

轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/29776.html

標籤:java

上一篇:JZ54 字符流中第一個不重復的字符

下一篇:jvisualvm如何安裝Visual GC插件

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【C++】Microsoft C++、C 和匯編程式檔案

    ......

    uj5u.com 2020-09-10 00:57:23 more
  • 例外宣告

    相比于斷言適用于排除邏輯上不可能存在的狀態,例外通常是用于邏輯上可能發生的錯誤。 例外宣告 Item 1:當函式不可能拋出例外或不能接受拋出例外時,使用noexcept 理由 如果不打算拋出例外的話,程式就會認為無法處理這種錯誤,并且應當盡早終止,如此可以有效地阻止例外的傳播與擴散。 示例 //不可 ......

    uj5u.com 2020-09-10 00:57:27 more
  • Codeforces 1400E Clear the Multiset(貪心 + 分治)

    鏈接:https://codeforces.com/problemset/problem/1400/E 來源:Codeforces 思路:給你一個陣列,現在你可以進行兩種操作,操作1:將一段沒有 0 的區間進行減一的操作,操作2:將 i 位置上的元素歸零。最終問:將這個陣列的全部元素歸零后操作的最少 ......

    uj5u.com 2020-09-10 00:57:30 more
  • UVA11610 【Reverse Prime】

    本人看到此題沒有翻譯,就附帶了一個自己的翻譯版本 思考 這一題,它的第一個要求是找出所有 $7$ 位反向質數及其質因數的個數。 我們應該需要質數篩篩選1~$10^{7}$的所有數,這里就不慢慢介紹了。但是,重讀題,我們突然發現反向質數都是 $7$ 位,而將它反過來后的數字卻是 $6$ 位數,這就說明 ......

    uj5u.com 2020-09-10 00:57:36 more
  • 統計區間素數數量

    1 #pragma GCC optimize(2) 2 #include <bits/stdc++.h> 3 using namespace std; 4 bool isprime[1000000010]; 5 vector<int> prime; 6 inline int getlist(int ......

    uj5u.com 2020-09-10 00:57:47 more
  • C/C++編程筆記:C++中的 const 變數詳解,教你正確認識const用法

    1、C中的const 1、區域const變數存放在堆疊區中,會分配記憶體(也就是說可以通過地址間接修改變數的值)。測驗代碼如下: 運行結果: 2、全域const變數存放在只讀資料段(不能通過地址修改,會發生寫入錯誤), 默認為外部聯編,可以給其他源檔案使用(需要用extern關鍵字修飾) 運行結果: ......

    uj5u.com 2020-09-10 00:58:04 more
  • 【C++犯錯記錄】VS2019 MFC添加資源不懂如何修改資源宏ID

    1. 首先在資源視圖中,添加資源 2. 點擊新添加的資源,復制自動生成的ID 3. 在解決方案資源管理器中找到Resource.h檔案,編輯,使用整個專案搜索和替換的方式快速替換 宏宣告 4. Ctrl+Shift+F 全域搜索,點擊查找全部,然后逐個替換 5. 為什么使用搜索替換而不使用屬性視窗直 ......

    uj5u.com 2020-09-10 00:59:11 more
  • 【C++犯錯記錄】VS2019 MFC不懂的批量添加資源

    1. 打開資源頭檔案Resource.h,在其中預先定義好宏 ID(不清楚其實ID值應該設定多少,可以先新建一個相同的資源項,再在這個資源的ID值的基礎上遞增即可) 2. 在資源視圖中選中專案資源,按F7編輯資源檔案,按 ID 型別 相對路徑的形式添加 資源。(別忘了先把檔案拷貝到專案中的res檔案 ......

    uj5u.com 2020-09-10 01:00:19 more
  • C/C++編程筆記:關于C++的參考型別,專供新手入門使用

    今天要講的是C++中我最喜歡的一個用法——參考,也叫別名。 參考就是給一個變數名取一個變數名,方便我們間接地使用這個變數。我們可以給一個變數創建N個參考,這N + 1個變數共享了同一塊記憶體區域。(參考型別的變數會占用記憶體空間,占用的記憶體空間的大小和指標型別的大小是相同的。雖然參考是一個物件的別名,但 ......

    uj5u.com 2020-09-10 01:00:22 more
  • 【C/C++編程筆記】從頭開始學習C ++:初學者完整指南

    眾所周知,C ++的學習曲線陡峭,但是花時間學習這種語言將為您的職業帶來奇跡,并使您與其他開發人員區分開。您會更輕松地學習新語言,形成真正的解決問題的技能,并在編程的基礎上打下堅實的基礎。 C ++將幫助您養成良好的編程習慣(即清晰一致的編碼風格,在撰寫代碼時注釋代碼,并限制類內部的可見性),并且由 ......

    uj5u.com 2020-09-10 01:00:41 more
最新发布
  • Rust中的智能指標:Box<T> Rc<T> Arc<T> Cell<T> RefCell<T> Weak

    Rust中的智能指標是什么 智能指標(smart pointers)是一類資料結構,是擁有資料所有權和額外功能的指標。是指標的進一步發展 指標(pointer)是一個包含記憶體地址的變數的通用概念。這個地址參考,或 ” 指向”(points at)一些其 他資料 。參考以 & 符號為標志并借用了他們所 ......

    uj5u.com 2023-04-20 07:24:10 more
  • Java的值傳遞和參考傳遞

    值傳遞不會改變本身,參考傳遞(如果傳遞的值需要實體化到堆里)如果發生修改了會改變本身。 1.基本資料型別都是值傳遞 package com.example.basic; public class Test { public static void main(String[] args) { int ......

    uj5u.com 2023-04-20 07:24:04 more
  • [2]SpinalHDL教程——Scala簡單入門

    第一個 Scala 程式 shell里面輸入 $ scala scala> 1 + 1 res0: Int = 2 scala> println("Hello World!") Hello World! 檔案形式 object HelloWorld { /* 這是我的第一個 Scala 程式 * 以 ......

    uj5u.com 2023-04-20 07:23:58 more
  • 理解函式指標和回呼函式

    理解 函式指標 指向函式的指標。比如: 理解函式指標的偽代碼 void (*p)(int type, char *data); // 定義一個函式指標p void func(int type, char *data); // 宣告一個函式func p = func; // 將指標p指向函式func ......

    uj5u.com 2023-04-20 07:23:52 more
  • Django筆記二十五之資料庫函式之日期函式

    本文首發于公眾號:Hunter后端 原文鏈接:Django筆記二十五之資料庫函式之日期函式 日期函式主要介紹兩個大類,Extract() 和 Trunc() Extract() 函式作用是提取日期,比如我們可以提取一個日期欄位的年份,月份,日等資料 Trunc() 的作用則是截取,比如 2022-0 ......

    uj5u.com 2023-04-20 07:23:45 more
  • 一天吃透JVM面試八股文

    什么是JVM? JVM,全稱Java Virtual Machine(Java虛擬機),是通過在實際的計算機上仿真模擬各種計算機功能來實作的。由一套位元組碼指令集、一組暫存器、一個堆疊、一個垃圾回收堆和一個存盤方法域等組成。JVM屏蔽了與作業系統平臺相關的資訊,使得Java程式只需要生成在Java虛擬機 ......

    uj5u.com 2023-04-20 07:23:31 more
  • 使用Java接入小程式訂閱訊息!

    更新完微信服務號的模板訊息之后,我又趕緊把微信小程式的訂閱訊息給實作了!之前我一直以為微信小程式也是要企業才能申請,沒想到小程式個人就能申請。 訊息推送平臺🔥推送下發【郵件】【短信】【微信服務號】【微信小程式】【企業微信】【釘釘】等訊息型別。 https://gitee.com/zhongfuch ......

    uj5u.com 2023-04-20 07:22:59 more
  • java -- 緩沖流、轉換流、序列化流

    緩沖流 緩沖流, 也叫高效流, 按照資料型別分類: 位元組緩沖流:BufferedInputStream,BufferedOutputStream 字符緩沖流:BufferedReader,BufferedWriter 緩沖流的基本原理,是在創建流物件時,會創建一個內置的默認大小的緩沖區陣列,通過緩沖 ......

    uj5u.com 2023-04-20 07:22:49 more
  • Java-SpringBoot-Range請求頭設定實作視頻分段傳輸

    老實說,人太懶了,現在基本都不喜歡寫筆記了,但是網上有關Range請求頭的文章都太水了 下面是抄的一段StackOverflow的代碼...自己大修改過的,寫的注釋挺全的,應該直接看得懂,就不解釋了 寫的不好...只是希望能給視頻網站開發的新手一點點幫助吧. 業務場景:視頻分段傳輸、視頻多段傳輸(理 ......

    uj5u.com 2023-04-20 07:22:42 more
  • Windows 10開發教程_編程入門自學教程_菜鳥教程-免費教程分享

    教程簡介 Windows 10開發入門教程 - 從簡單的步驟了解Windows 10開發,從基本到高級概念,包括簡介,UWP,第一個應用程式,商店,XAML控制元件,資料系結,XAML性能,自適應設計,自適應UI,自適應代碼,檔案管理,SQLite資料庫,應用程式到應用程式通信,應用程式本地化,應用程式 ......

    uj5u.com 2023-04-20 07:22:35 more