跟著原始碼看lcn分布式事務
lcn分布式事務具體實作思路是服務器A創建事務,構建事務資訊并將事務資訊發送到事務處理器,處理程序中可能用到服務器B、C,會將事務組Id傳給B、C,B、C業務處理完成后將事務資訊加入到對應的事務組,并且創建一個執行緒對事務組狀態進行檢測A事務處理完成后向服務其發送的事務狀態,B、C會根據監督的狀態判斷對事務進行具體的提交或者回滾操作,
分布式事務流程圖

lcn分布式事務是通過切面進行實作,lcn內部共有兩個aop切面,分別是DataSourceAspect和TransactionAspect,DataSourceAspect攔截對應的資料庫連接(connection),TransactionAspect進行具體的分布式事務的實作,這里事務的開始是通過TransactionAspect進行開始,這里僅截取lcn部分代碼去介紹,
private final DTXLogicWeaver dtxLogicWeaver;
@Pointcut("@annotation(com.codingapi.txlcn.tc.annotation.LcnTransaction)")
public void lcnTransactionPointcut() {
}
@Around("lcnTransactionPointcut() && !txcTransactionPointcut()" +
"&& !tccTransactionPointcut() && !txTransactionPointcut()")
public Object runWithLcnTransaction(ProceedingJoinPoint point) throws Throwable {
DTXInfo dtxInfo = DTXInfo.getFromCache(point);
LcnTransaction lcnTransaction = dtxInfo.getBusinessMethod().getAnnotation(LcnTransaction.class);
dtxInfo.setTransactionType(Transactions.LCN);
dtxInfo.setTransactionPropagation(lcnTransaction.propagation());
return dtxLogicWeaver.runTransaction(dtxInfo, point::proceed);
}
事務攔截器通過攔截帶有LcnTransaction注解的方法后進行攔截處理,然后啟動事務runTransaction方法啟動具體的分布式事務實作類,
public Object runTransaction(DTXInfo dtxInfo, BusinessCallback business) throws Throwable {
log.debug("<---- TxLcn start ---->");
//這里獲取dtxLocalContext,創建事務時新建,每個事務之只能有一個存在
DTXLocalContext dtxLocalContext = DTXLocalContext.getOrNew();
TxContext txContext;
// ---------- 保證每個模塊在一個DTX下只會有一個TxContext ---------- //
//判斷是否存在事務資訊,沒有啟動事務,有加入到事務
if (globalContext.hasTxContext()) {
// 有事務背景關系的獲取事務背景關系
txContext = globalContext.txContext();
dtxLocalContext.setInGroup(true);
log.debug("Unit[{}] used parent's TxContext[{}].", dtxInfo.getUnitId(), txContext.getGroupId());
// 本地事務呼叫
if (Objects.nonNull(dtxLocalContext.getGroupId())) {
dtxLocalContext.setDestroy(false);
}
} else {
// 沒有的開啟本地事務背景關系
txContext = globalContext.startTx();
dtxLocalContext.setInGroup(false);
}
dtxLocalContext.setUnitId(dtxInfo.getUnitId());
dtxLocalContext.setGroupId(txContext.getGroupId());
dtxLocalContext.setTransactionType(dtxInfo.getTransactionType());
// 事務引數
TxTransactionInfo info = new TxTransactionInfo();
info.setBusinessCallback(business);
info.setGroupId(txContext.getGroupId());
info.setUnitId(dtxInfo.getUnitId());
info.setPointMethod(dtxInfo.getBusinessMethod());
info.setPropagation(dtxInfo.getTransactionPropagation());
info.setTransactionInfo(dtxInfo.getTransactionInfo());
info.setTransactionType(dtxInfo.getTransactionType());
info.setTransactionStart(txContext.isDtxStart());
//LCN事務處理器
try {
return transactionServiceExecutor.transactionRunning(info);
} finally {
// 執行緒執行業務完畢清理本地資料
if (dtxLocalContext.isDestroy()) {
// 通知事務執行完畢
synchronized (txContext.getLock()) {
txContext.getLock().notifyAll();
}
// TxContext生命周期是? 和事務組一樣(不與具體模塊相關的)
if (!dtxLocalContext.isInGroup()) {
globalContext.destroyTx();
}
}
if(info.isTransactionStart() && info.getPropagation().equals(DTXPropagation.REQUIRES_NEW)) {
DTXLocalContext.makeNeverAppeared();
TracingContext.tracing().destroy();
}
log.debug("<---- TxLcn end ---->");
}
}
這里會存在一個unitId代表每個服務的唯一性代碼,如果存在直接回傳,如果不存在則新建,是從DTXInfo中獲取,通過Transactions的unitId方式生成(具體代碼放在了業務代碼里,不方便貼出來),
//根據方法名生成unitId然后去dtxInfoCache中獲取,存在直接回傳,不存在存入快取中然后回傳
public static DTXInfo getFromCache(MethodInvocation methodInvocation) {
String signature = methodInvocation.getMethod().toString();
String unitId = Transactions.unitId(signature);
DTXInfo dtxInfo = dtxInfoCache.get(unitId);
if (Objects.isNull(dtxInfo)) {
dtxInfo = new DTXInfo(methodInvocation.getMethod(),
methodInvocation.getArguments(), methodInvocation.getThis().getClass());
dtxInfoCache.put(unitId, dtxInfo);
}
dtxInfo.reanalyseMethodArgs(methodInvocation.getArguments());
return dtxInfo;
}
//md5根據方法的signature進行計算得到
public static String unitId(String methodSignature) {
return DigestUtils.md5DigestAsHex((APPLICATION_ID_WHEN_RUNNING + methodSignature).getBytes());
}
DTXLocalContext.getOrNew(),會獲取一個本地事務背景關系資訊,由于同一個事務可能多次用到,這里使用的時getOrNew當第一次使用使用直接新建,并保存在ThreadLocal變數currentLocal里,否則回傳已經存在的,這里是為了保證每個請求對應獲取到的是同一個類,避免并發問題引起分布式事務問題,
private final static ThreadLocal<Map<String, DTXLocalContext>> currentLocal = new InheritableThreadLocal<>();
public static DTXLocalContext getOrNew() {
//這里是dataSource的名稱,可能是動態資料源,系統不同獲取就不同,這里把本系統的代碼去掉了
String dataSourceName = "dataSource";
if (currentLocal.get() == null) {
//存在背景關系資訊,當時不存在對應的dataSource的名稱,針對多資料源
currentLocal.set(new ConcurrentHashMap<String, DTXLocalContext>());
}
//首先獲取背景關系資訊
DTXLocalContext dtxLocalContext = currentLocal.get().get(dataSourceName);
if(null == dtxLocalContext) {
// 不存在直接新建事務背景關系并保存到本地集合
dtxLocalContext = new DTXLocalContext();
currentLocal.get().put(dataSourceName, dtxLocalContext);
}
//回傳背景關系資訊
return dtxLocalContext;
}
globalContext.startTx(),啟動事務生成一個groupId然后將其存入ThreadLocal中,
public TxContext startTx() {
TxContext txContext = new TxContext();
// 事務發起方判斷,這里判斷是否有事務根據事務組id(groupId)進行判斷
txContext.setDtxStart(!TracingContext.tracing().hasGroup());
if (txContext.isDtxStart()) {
//沒有啟動事務則啟動事務
TracingContext.tracing().beginTransactionGroup();
}
txContext.setGroupId(TracingContext.tracing().groupId());
String txContextKey = txContext.getGroupId() + ".dtx";
attachmentCache.attach(txContextKey, txContext);
log.debug("Start TxContext[{}]", txContext.getGroupId());
return txContext;
}
新建事務時會存在一個groupId代表整個事務唯一id,當事務不存在的情況下(根據groupId進行判斷,tracingContextThreadLocal中不存在groupId則不存在,否則就是存在,不存在的初始化一個,會隨機生成一個groupId,在tracingContextThreadLocal中加入初始化的事務資訊)
private static ThreadLocal<TracingContext> tracingContextThreadLocal = new ThreadLocal<>();
public static TracingContext tracing() {
if (tracingContextThreadLocal.get() == null) {
tracingContextThreadLocal.set(new TracingContext());
}
return tracingContextThreadLocal.get();
}
public void beginTransactionGroup() {
if (hasGroup()) {
return;
}
//不存在事務組的情況下,初始化一個,gourpId是一個亂數
init(Maps.newHashMap(TracingConstants.GROUP_ID, RandomUtils.randomKey(), TracingConstants.APP_MAP, "{}"));
}
public static void init(Map<String, String> initFields) {
// 將生成的TracingContext資訊加入到tracingContextThreadLocal
if (Objects.isNull(initFields)) {
log.warn("init tracingContext fail. null init fields.");
return;
}
TracingContext tracingContext = tracing();
if (Objects.isNull(tracingContext.fields)) {
tracingContext.fields = new HashMap<>();
}
tracingContext.fields.putAll(initFields);
}
具體處理邏輯在這里,這里會有事務的傳播行為處理propagationResolver.resolvePropagationState(info),事務處理前操作(向服務端發送請求,將事務加入到事務組)dtxLocalControl.preBusinessCode(info)、具體的業務處理dtxLocalControl.doBusinessCode(info)、業務成功后的操作dtxLocalControl.onBusinessCodeSuccess(info, result)、業務執行失敗操作dtxLocalControl.onBusinessCodeError(info, e)、業務結束操作dtxLocalControl.postBusinessCode(info),
public Object transactionRunning(TxTransactionInfo info) throws Throwable {
// 1. 獲取事務型別
String transactionType = info.getTransactionType();
// 2. 獲取事務傳播狀態
DTXPropagationState propagationState = propagationResolver.resolvePropagationState(info);
// 2.1 如果不參與分布式事務立即終止
if (propagationState.isIgnored()) {
return info.getBusinessCallback().call();
}
// 3. 獲取本地分布式事務控制器
DTXLocalControl dtxLocalControl = txLcnBeanHelper.loadDTXLocalControl(transactionType, propagationState);
// 4. 織入事務操作
try {
// 4.1 記錄事務型別到事務背景關系
Set<String> transactionTypeSet = globalContext.txContext(info.getGroupId()).getTransactionTypes();
transactionTypeSet.add(transactionType);
dtxLocalControl.preBusinessCode(info);
// 4.2 業務執行前
txLogger.txTrace(
info.getGroupId(), info.getUnitId(), "pre business code, unit type: {}", transactionType);
// 4.3 執行業務
Object result = dtxLocalControl.doBusinessCode(info);
// 4.4 業務執行成功
txLogger.txTrace(info.getGroupId(), info.getUnitId(), "business success");
dtxLocalControl.onBusinessCodeSuccess(info, result);
return result;
} catch (TransactionException e) {
txLogger.error(info.getGroupId(), info.getUnitId(), "before business code error");
throw e;
} catch (Throwable e) {
// 4.5 業務執行失敗
txLogger.error(info.getGroupId(), info.getUnitId(), Transactions.TAG_TRANSACTION,
"business code error");
dtxLocalControl.onBusinessCodeError(info, e);
throw e;
} finally {
// 4.6 業務執行完畢
dtxLocalControl.postBusinessCode(info);
}
}
傳播行為處理操作,具體處理是根據傳遞的屬性判斷、將事務加入到事務組、新建事務或直接不加入事務,這里不介紹具體的傳播行為,不理解的可以執行查找了解下,
public DTXPropagationState resolvePropagationState(TxTransactionInfo txTransactionInfo) throws TransactionException {
// 本地已在DTX,根據事務傳播,靜默加入
if (DTXLocalContext.cur().isInGroup()) {
log.debug("SILENT_JOIN group!");
return DTXPropagationState.SILENT_JOIN;
}
// 發起方之前沒有事務
if (txTransactionInfo.isTransactionStart()) {
// 根據事務傳播,對于 SUPPORTS 不參與事務
if (DTXPropagation.SUPPORTS.equals(txTransactionInfo.getPropagation())) {
return DTXPropagationState.NON;
}
// 根據事務傳播,創建事務
return DTXPropagationState.CREATE;
}
// 已經存在DTX,根據事務傳播,加入
return DTXPropagationState.JOIN;
}
業務處理前操作dtxLocalControl.preBusinessCode(info),這里dtxLocalControl有兩個實作類,分別對應創建事務LcnStartingTransaction和加入事務LcnRunningTransaction,這里使用的是創建事務,
public class LcnStartingTransaction implements DTXLocalControl {
private final TransactionControlTemplate transactionControlTemplate;
private final TCGlobalContext globalContext;
@Autowired
public LcnStartingTransaction(TransactionControlTemplate transactionControlTemplate, TCGlobalContext globalContext) {
this.transactionControlTemplate = transactionControlTemplate;
this.globalContext = globalContext;
}
@Override
public void preBusinessCode(TxTransactionInfo info) throws TransactionException {
// 創建事務操作
transactionControlTemplate.createGroup(
info.getGroupId(), info.getUnitId(), info.getTransactionInfo(), info.getTransactionType());
DTXLocalContext.makeProxy();
}
@Override
public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
DTXLocalContext.cur().setSysTransactionState(0);
}
@Override
public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) {
DTXLocalContext.cur().setSysTransactionState(1);
}
@Override
public void postBusinessCode(TxTransactionInfo info) {
// 這里是通知事務完成
transactionControlTemplate.notifyGroup(
info.getGroupId(), info.getUnitId(), info.getTransactionType(),
DTXLocalContext.transactionState(globalContext.dtxState(info.getGroupId())));
DTXLocalContext.makeNeverAppeared();
}
}
創建事務組,向事務服務器發送事務,TransactionControlTemplate通過createGroup方法想事務服務其發送事務,
public void createGroup(String groupId, String unitId, TransactionInfo transactionInfo, String transactionType)
throws TransactionException {
//創建事務組
try {
// 日志
txLogger.txTrace(groupId, unitId,
"create group > transaction type: {}", transactionType);
// 創建事務組訊息,想服務端發送服務請求
reliableMessenger.createGroup(groupId);
// 快取發起方切面資訊
aspectLogger.trace(groupId, unitId, transactionInfo);
} catch (RpcException e) {
// 通訊例外
dtxExceptionHandler.handleCreateGroupMessageException(groupId, e);
} catch (LcnBusinessException e) {
// 創建事務組業務失敗
dtxExceptionHandler.handleCreateGroupBusinessException(groupId, e.getCause());
}
txLogger.txTrace(groupId, unitId, "create group over");
}
//向服務器端發送請求創建事務
@Override
public void createGroup(String groupId) throws RpcException, LcnBusinessException {
// TxManager創建事務組
MessageDto messageDto = request(MessageCreator.createGroup(groupId));
if (!MessageUtils.statusOk(messageDto)) {
throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
}
}
通知事務完成,向服務器發送請求通知事務完成,這里是由于非事務創建者會有一個執行緒來獲取該狀態進行回滾使用,
@Override
public int notifyGroup(String groupId, int transactionState) throws RpcException, LcnBusinessException {
NotifyGroupParams notifyGroupParams = new NotifyGroupParams();
notifyGroupParams.setGroupId(groupId);
notifyGroupParams.setState(transactionState);
//具體的發送請求資訊,這里不詳細介紹
MessageDto messageDto = request0(MessageCreator.notifyGroup(notifyGroupParams),
clientConfig.getTmRpcTimeout() * clientConfig.getChainLevel());
// 成功清理發起方事務
if (!MessageUtils.statusOk(messageDto)) {
throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
}
return messageDto.loadBean(Integer.class);
}
這里是通知事務完成,會向服務器發送一個資訊,來告訴事務服務器事務處理完成,成功了或者失敗了,
@Override
public int notifyGroup(String groupId, int transactionState) throws RpcException, LcnBusinessException {
NotifyGroupParams notifyGroupParams = new NotifyGroupParams();
notifyGroupParams.setGroupId(groupId);
notifyGroupParams.setState(transactionState);
MessageDto messageDto = request0(MessageCreator.notifyGroup(notifyGroupParams),
clientConfig.getTmRpcTimeout() * clientConfig.getChainLevel());
// 成功清理發起方事務
if (!MessageUtils.statusOk(messageDto)) {
throw new LcnBusinessException(messageDto.loadBean(Throwable.class));
}
return messageDto.loadBean(Integer.class);
}
具體的事務處理是直接呼叫業務代碼,這里就不介紹了,下connect切面DataSourceAspect,業務中每次獲取資料庫連接都通過切面進行處理,
@Around("execution(* javax.sql.DataSource.getConnection(..))")
public Object around(ProceedingJoinPoint point) throws Throwable {
return dtxResourceWeaver.getConnection(() -> (Connection) point.proceed());
}
切面的具體處理是將connection通過TransactionResourceProxy代理類進行處理,然后統一回傳代理類處理邏輯如下,
public Object getConnection(ConnectionCallback connectionCallback) throws Throwable {
DTXLocalContext dtxLocalContext = DTXLocalContext.cur();
if (Objects.nonNull(dtxLocalContext) && dtxLocalContext.isProxy()) {
String transactionType = dtxLocalContext.getTransactionType();
TransactionResourceProxy resourceProxy = txLcnBeanHelper.loadTransactionResourceProxy(transactionType);
//這里是創建連接或獲取代理類,并將其存入的集合中(key使用groupId)
Connection connection = resourceProxy.proxyConnection(connectionCallback);
if(!connection.isClosed()) {
if(connection instanceof LcnConnectionProxy) {
log.debug("proxy a LcnConnectionProxy connection: {}.", ((LcnConnectionProxy)connection).getTarget());
}
else {
log.debug("proxy a sql connection: {}.", connection);
}
return connection;
}
}
return connectionCallback.call();
}
這里是資料庫連接類的處理會從globalContext獲取一個連接,如果沒有將新建一個連接然后加入到對應的ConcurrentHashMap中(這里有groupId做key不用擔心存在多執行緒問題),這里是為后面提交和回滾操作做鋪墊,
@Override
public Connection proxyConnection(ConnectionCallback connectionCallback) throws Throwable {
String groupId = DTXLocalContext.cur().getGroupId();
try {
//通過背景關系獲取資料庫連接
return globalContext.getLcnConnection(groupId, DynamicDataSourceHolder.getDataSource());
} catch (TCGlobalContextException e) {
//如果不存在連接,直接新建一個連接,將自動提交設定成否,然后加入到ConcurrentHashMap中,這里是為后續操作做鋪墊
LcnConnectionProxy lcnConnectionProxy = new LcnConnectionProxy(connectionCallback.call());
globalContext.setLcnConnection(groupId, DynamicDataSourceHolder.getDataSource(), lcnConnectionProxy);
lcnConnectionProxy.setAutoCommit(false);
return lcnConnectionProxy;
}
}
加入事務,這里通過類LcnRunningTransaction進行處理,這里是將preBusinessCode方法中創建事務去掉,然后將onBusinessCodeSuccess的發送事務成功的方法方法改成事務向事務服務器發送加入事務組請求,其內同步添加異步檢測方法,
public class LcnRunningTransaction implements DTXLocalControl {
private final TransactionCleanTemplate transactionCleanTemplate;
private final TransactionControlTemplate transactionControlTemplate;
@Autowired
public LcnRunningTransaction(TransactionCleanTemplate transactionCleanTemplate,
TransactionControlTemplate transactionControlTemplate) {
this.transactionCleanTemplate = transactionCleanTemplate;
this.transactionControlTemplate = transactionControlTemplate;
}
@Override
public void preBusinessCode(TxTransactionInfo info) {
DTXLocalContext.makeProxy();
}
@Override
public void onBusinessCodeError(TxTransactionInfo info, Throwable throwable) {
try {
//呼叫事務的清理方法
transactionCleanTemplate.clean(info.getGroupId(), info.getUnitId(), info.getTransactionType(), 0);
} catch (TransactionClearException e) {
log.error("{} > clean transaction error." , Transactions.LCN);
}
}
@Override
public void onBusinessCodeSuccess(TxTransactionInfo info, Object result) throws TransactionException {
log.debug("join group: [GroupId: {},Method: {}]" , info.getGroupId(),
info.getTransactionInfo().getMethodStr());
//加入事務組
transactionControlTemplate.joinGroup(info.getGroupId(), info.getUnitId(), info.getTransactionType(),
info.getTransactionInfo());
}
}
非事務服務器創建成功后,將其向事務服務器發請求,告訴事務服務器該服務器事務處理完成,將其加入事務組,加入成功后會有一個發起一個執行緒進行監督事務事務是否處理完成,對應的是事務創建服務器中的發送狀態,
public void joinGroup(String groupId, String unitId, String transactionType, TransactionInfo transactionInfo)
throws TransactionException {
try {
txLogger.txTrace(groupId, unitId, "join group > transaction type: {}", transactionType);
//這里將事務加入到事務組
reliableMessenger.joinGroup(groupId, unitId, transactionType, DTXLocalContext.transactionState(globalContext.dtxState(groupId)));
txLogger.txTrace(groupId, unitId, "join group message over.");
// 異步檢測,在這里檢測事務是否處理完成
dtxChecking.startDelayCheckingAsync(groupId, unitId, transactionType);
// 快取參與方切面資訊
aspectLogger.trace(groupId, unitId, transactionInfo);
} catch (RpcException e) {
dtxExceptionHandler.handleJoinGroupMessageException(Arrays.asList(groupId, unitId, transactionType), e);
} catch (LcnBusinessException e) {
dtxExceptionHandler.handleJoinGroupBusinessException(Arrays.asList(groupId, unitId, transactionType), e);
}
txLogger.txTrace(groupId, unitId, "join group logic over");
}
這里是具體的檢測方法,這里是通過執行緒池進行定時向服務器發送請求,獲取事物的狀態,即事物的創建者是否完成事物的創建,
public void startDelayCheckingAsync(String groupId, String unitId, String transactionType) {
txLogger.taskTrace(groupId, unitId, "start delay checking task");
ScheduledFuture scheduledFuture = scheduledExecutorService.schedule(() -> {
try {
TxContext txContext = globalContext.txContext(groupId);
if (Objects.nonNull(txContext)) {
synchronized (txContext.getLock()) {
txLogger.taskTrace(groupId, unitId, "checking waiting for business code finish.");
txContext.getLock().wait();
}
}
//這里是發送事務請求,查看是否發送成功
int state = reliableMessenger.askTransactionState(groupId, unitId);
txLogger.taskTrace(groupId, unitId, "ask transaction state {}", state);
if (state == -1) {
txLogger.error(this.getClass().getSimpleName(), "delay clean transaction error.");
//這里是發送請求后,事務失敗后的操作
onAskTransactionStateException(groupId, unitId, transactionType);
} else {
//這里是事務處理成功的操作
transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
aspectLogger.clearLog(groupId, unitId);
}
} catch (RpcException e) {
onAskTransactionStateException(groupId, unitId, transactionType);
} catch (TransactionClearException | InterruptedException e) {
txLogger.error(this.getClass().getSimpleName(), "{} clean transaction error.", transactionType);
}
}, clientConfig.getDtxTime(), TimeUnit.MILLISECONDS);
delayTasks.put(groupId + unitId, scheduledFuture);
}
事務成功后操作具體的介面為TransactionCleanService,這里對應的是三個實作類,對應三種事務處理方法,我們的是lcn,通過groupId取到資料源切面的連接代理類,
@Override
public void clear(String groupId, int state, String unitId, String unitType) throws TransactionClearException {
try {
//這里是獲取資料庫連接操作,跟上面的事務切面相對應,同時groupId取得事務組連接
Set<LcnConnectionProxy> connectionProxy = globalContext.getLcnConnection(groupId);
//取得的連接進行提交或者回滾操作
connectionProxy.forEach(con -> con.notify(state));
// todo notify exception
} catch (TCGlobalContextException e) {
log.warn("Non lcn connection when clear transaction.");
} finally {
DTXLocalContext.makeNeverAppeared();
}
}
這里是具體的事務成功或者失敗,進行提交和回滾的操作,這里具體的類為LcnConnectionProxy,對應事務的成功操作后的提交和回滾操作,這里的connection適合DataSourceAspect生成的連接是同一個,之前是通過groupId存入ConcurrentHashMap,這里再通過groupId取出對應的資料庫連接,
public RpcResponseState notify(int state) {
try {
if(!connection.isClosed()) {
if (state == 1) {
log.debug("commit transaction type[lcn] proxy connection:{}.", connection);
//事務成功進行提交操作
connection.commit();
} else {
log.debug("rollback transaction type[lcn] proxy connection:{}.", connection);
//事務失敗進行回滾操作
connection.rollback();
}
connection.close();
}
log.debug("transaction type[lcn] proxy connection:{} closed.", connection);
return RpcResponseState.success;
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
return RpcResponseState.fail;
}
}
這里是事務處理失敗的情況,是將事務發送狀態設定為失敗,然后呼叫上面的clean命令進行回滾,失敗和成功最終都會呼叫clear方法進行事物的處理,失敗釋放在例外中進行處理,但是失敗會分兩種情況,是否創建過connection,創建了就進行回滾,未創建則不需要,這里可能會存在多種情況(由于發生例外的情況不同導致,有些是可以提交的,有些不能)這里不詳細介紹,具體思路即能提交的就調將狀態設定成1,然后clear方法進行提交,不能提交的則將狀態設定成非1,
@Override
public void handleNotifyGroupBusinessException(Object params, Throwable ex) {
List paramList = (List) params;
String groupId = (String) paramList.get(0);
int state = (int) paramList.get(1);
String unitId = (String) paramList.get(2);
String transactionType = (String) paramList.get(3);
if(null != ex) {
//用戶強制回滾.
if (ex instanceof UserRollbackException) {
state = 0;
}
if ((ex.getCause() != null && ex.getCause() instanceof UserRollbackException)) {
state = 0;
}
}
else {
// LCN例外
state = 0;
}
// 結束事務
try {
transactionCleanTemplate.clean(groupId, unitId, transactionType, state);
} catch (TransactionClearException e) {
txLogger.error(groupId, unitId, "notify group", "{} > clean transaction error.", transactionType);
}
}
事物的groupId傳遞,Tracings這里通過將請求頭進行處理,這里是通過restTemplate攔截器進行處理(其它方式如dubbo、feign可自行查看原始碼),具體思路是事務創建者將groupId存入請求頭,事務的加入者會根據請求頭來獲取groupId然后根據groupId初始化TracingContext,
public class RestTemplateTracingTransmitter implements ClientHttpRequestInterceptor {
@Autowired
public RestTemplateTracingTransmitter(@Autowired(required = false) List<RestTemplate> restTemplates) {
if (Objects.nonNull(restTemplates)) {
restTemplates.forEach(restTemplate -> {
List<ClientHttpRequestInterceptor> interceptors = restTemplate.getInterceptors();
interceptors.add(interceptors.size(), RestTemplateTracingTransmitter.this);
});
}
}
@Override
@NonNull
public ClientHttpResponse intercept(
@NonNull HttpRequest httpRequest, @NonNull byte[] bytes,
@NonNull ClientHttpRequestExecution clientHttpRequestExecution) throws IOException {
//這里是Lambda運算式寫法httpRequest.getHeaders()::add是對TracingSetter的set方法的實作
Tracings.transmit(httpRequest.getHeaders()::add);
return clientHttpRequestExecution.execute(httpRequest, bytes);
}
}
//這里springMvc的配置類,可以進行具體業務處理前的一些準備作業如解碼
public class SpringTracingApplier implements com.codingapi.txlcn.tracing.http.spring.HandlerInterceptor, WebMvcConfigurer {
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
//這里是Lambda運算式寫法httpRequest.getHeaders()::add是對TracingGetter的get方法的實作
Tracings.apply(request::getHeader);
return true;
}
//將攔截器加入到容器
@Override
public void addInterceptors(InterceptorRegistry registry) {
registry.addInterceptor(this);
}
}
public class Tracings {
//私有構造器,保證了該方法不會被外部直接構建,確保gourpId能夠被正確傳遞
private Tracings() {
}
public static void transmit(TracingSetter tracingSetter) {
//請求發起前,將groupId存入報文頭
if (TracingContext.tracing().hasGroup()) {
log.debug("tracing transmit group:{}", TracingContext.tracing().groupId());
tracingSetter.set(TracingConstants.HEADER_KEY_GROUP_ID, TracingContext.tracing().groupId());
tracingSetter.set(TracingConstants.HEADER_KEY_APP_MAP, Base64Utils.encodeToString(TracingContext.tracing(). appMapString().getBytes(StandardCharsets.UTF_8)));
}
}
public static void apply(TracingGetter tracingGetter) {
String groupId = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_GROUP_ID)).orElse("");
String appList = Optional.ofNullable(tracingGetter.get(TracingConstants.HEADER_KEY_APP_MAP)).orElse("");
//這里和之前的init方法一致,這里是通過獲取報文頭的獲取groupId進行初始化,保證事務創建者的groupId能傳過來
TracingContext.init(Maps.newHashMap(TracingConstants.GROUP_ID, groupId, TracingConstants.APP_MAP,
StringUtils.isEmpty(appList) ? appList : new String(Base64Utils.decodeFromString(appList), StandardCharsets.UTF_8)));
if (TracingContext.tracing().hasGroup()) {
log.debug("tracing apply group:{}, app map:{}", groupId, appList);
}
}
public interface TracingSetter {
void set(String key, String value);
}
public interface TracingGetter {
String get(String key);
}
}
這是lcn分布式事務的處理的具體思路,這里介紹的還是有點缺陷的,事物的補償并未考慮(概率很小),即當事務全部處理完成后,事務服務器收到事務創建者的資訊后,事務服務器或者后面用到的服務器夯機如何處理,也就是創建者服務器的事務提交了,其他服務器的事務提交不了,我這里想法是通過redis對應事務組資訊去進行補償處理,各位小伙伴也可以考慮下如何保證進行 補償或者如何保證事務的強一致性,
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/33101.html
標籤:其他
下一篇:1萬小時定律,各位還差多少小時?
