5. 分布式事務解決方案之可靠訊息最終一致性
5.1. 什么是可靠訊息最終一致性事務
可靠訊息最終一致性方案是指當事務發起執行完全本地事務后并發出一條訊息,事務參與方(訊息消費者)一定能夠接收訊息并處理事務成功,此方案強調的是只要訊息發給事務參與方最終事務要達到一致,
此方案是利用訊息中間件完成,如下圖:
事務發起方(訊息生產方)將訊息發給訊息中間件,事務參與方從訊息中間件接收訊息,事務發起方和訊息中間件之間,事務參與方(訊息消費方)和訊息中間件之間都是通過網路通信,由于網路通信的不確定性導致分布式事務問題,
因此可靠訊息最終一致性方案要解決以下幾個問題 :
1、本地事務與訊息發送的原子性問題
本地事務與訊息發送的原子性問題即 :事務發起方在本地事務執行成功后訊息必須發出去,否則就丟棄訊息,即實作本地事務和訊息發送的原子性,要么都成功,要么都失敗,本地事務與訊息發送的原子性問題是實作可靠訊息最終一致性方案的關鍵問題,
先來嘗試下這種操作,先發送訊息,再操作資料庫 :
begin transaction;
// 1.發送MQ
// 2.資料庫操作
commit transation;
這種情況下無法保證資料庫操作與發送訊息的一致性,因為可能發送訊息成功,資料庫操作失敗,
你立馬想到第二種方案,先進行資料庫操作,再發送訊息 :
begin transaction;
// 1.資料庫操作
// 2.發送MQ
commit transation;
這種情況下貌似沒有問題,如果發送MQ訊息失敗,就會拋出例外,導致資料庫事務回滾,但如果是超時例外,資料庫回滾,但MQ其實已經正常發送來,同樣會導致不一致,
2、事務參與方接收訊息的可靠性
事務參與方必須能夠從訊息佇列接收到訊息,如果接收訊息失敗可以重復接收訊息,
3、訊息重復消費的問題
由于網路2的存在,若某一個消費節點超時但是消費成功,此時訊息中間件會重復投遞此訊息,就導致來訊息的重復消費,要解決訊息重復消費的問題就要實作事務參與方的方法冪等性,
5.2. 解決方案
5.2.1. 本地訊息表方案
本地訊息表這個方案最初是eBay提出的,此方案的核心是通過本地事務保證資料業務操作和訊息的一致性,然后通過定時任務將訊息發送至訊息中間件,待確認訊息發送給消費方成功再將訊息洗掉,
下面以注冊送積分為例來說明 :
下例共有兩個微服務互動,用戶服務和積分服務,用戶服務負責添加用戶,積分服務負責增加積分,
互動流程如下 :
1、用戶注冊
用戶服務在本地事務新增用戶和增加“積分訊息日志”,(用戶表和訊息表通過本地事務保證一致)
下表是偽代碼
begin transaction;
// 1.新增用戶
// 2.存盤積分訊息日志
commit transation;
這種情況下,本地資料庫操作與存盤積分訊息日志處于同一事務中,本地資料庫操作與記錄訊息日志操作具備原子性,
2、定時任務掃描日志
如何保證將訊息發送給訊息佇列呢?
經過第一步訊息已經寫到訊息日志表中,可以啟動獨立的執行緒,定時對訊息日志表中的訊息進行掃描并發送至訊息中間件,在訊息中間件反饋發送成功后洗掉該訊息日志,否則等待定時任務下一周期重試,
3、消費訊息
如何保證消費者一定能消費到訊息呢?
這里可以使用MQ的ack(即訊息確認)機制,消費者監聽MQ,如果消費者接收到訊息并且業務處理完成后向MQ發送ack(即訊息確認),此時說明消費者正常消費訊息完成,MQ將不再向消費者推送訊息,否則消費者會不斷重試向消費者來發送訊息,
積分服務接收到“增加積分”訊息,開始增加積分,積分增加成功后訊息中間件回應ack,否則訊息中間件將重復投遞此訊息,
由于訊息會重復投遞,積分服務的“增加積分”功能需要實作冪等性,
5.2.2. RocketMQ事務訊息方案
RocketMQ是一個來自阿里巴巴的分布式訊息中間件,于2012年開源,并在2017年正式成為Apache頂級專案,據了解,包括阿里云上的訊息產品以及收購的子公司在內,阿里集團的訊息產品全線都運行在RocketMQ之上,并且最近幾年的雙十一大促中,RocketMQ都有搶眼表現,Apache RocketMQ 4.3之后的版本正式支持事務訊息,為分布式事務實作提供來便利性支持,
RocketMQ事務訊息設計則主要是為了解決Producer端的訊息發送與本地事務執行的原子性問題,RocketMQ的設計中broker與producer端的雙向通信能力,使得broker天生可以作為一個事務協調者存在;而RocketMQ本身提供的存盤機制為事務訊息提供了持久化能力;RocketMQ的高可用機制以及可靠訊息設計則為事務訊息在系統發生例外時依然能夠保證達成事務的最終一致性,
在RocketMQ 4.3后實作了完整的事務訊息,實際上其實是對本地訊息表的一個封裝,將本地訊息表移動到了MQ內部,解決Producer端的訊息發送與本地事務執行的原子性問題,
執行流程如下 :
為方便理解我們還以注冊送積分的例子來描述整個流程,
Producer即MQ發送方,本例中是用戶服務,負責新增用戶,MQ訂閱方即訊息消費方,本例中是積分服務,負責新增積分,
1、Producer發送事務訊息
Producer(MQ發送方)發送事務訊息至MQ Server,MQ Server將訊息狀態標記為Prepared(預覽狀態),注意此時這條訊息消費者(MQ訂閱方)是無法消費到的,
2、MQ Server回應訊息發送成功
MQ Server接收到Producer發送給的訊息則回應發送成功表示MQ已接收到訊息,
3、Producer執行本地事務
Producer端執行業務代碼邏輯,通過本地資料庫事務控制,
本例中,Producer執行添加用戶操作,
4、訊息投遞
若Producer本地事務執行成功則自動向MQ Server發送commit訊息,MQ Server接收到commit訊息后將“增加積分訊息”狀態標記為可消費,此時MQ訂閱方(積分服務)即正常消費訊息;
若Producer 本地事務執行失敗則自動向MQ Server發送rollback訊息,MQ Server接收到rollback訊息后將洗掉“增加積分訊息”,
MQ訂閱方(積分服務)消費訊息,消費成功則向MQ回應ack,否則將重復接收訊息,這里ack默認自動回應,即程式執行正常則自動回應ack,
5、事務回查
如果執行Producer端本地事務程序中,執行端掛掉,或者超時,MQ Server將會不停的詢問同組的其他Producer來獲取事務執行狀態,這個程序叫事務回查,MQ Server會根據事務回查結果來決定是否投遞訊息,
以上主干流程已由RocketMQ實作,對用戶則來說,用戶需要分別實作本地事務執行以及本地事務回查方法,因此只需關注本地事務的執行狀態即可,
RocketMQ提供RocketMQLocalTransactionListener介面 :
public interface RocketMQLocalTransactionListener {
/**
發送prepare訊息成功此方法被回呼,該方法用于執行本地事務
@param msg 回傳的訊息,利用transactionId即可獲取到該訊息的唯一Id
@param arg 呼叫send方法時傳遞的引數,當send時候若有額外的引數可以傳遞到send方法中,這里能獲取到
@return 回傳事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回呼
*/
RocketMQLocalTransactionState executeLocalTransaction(Message msg,Object arg);
/**
@param msg 通過獲取transactionId來判斷這條訊息的本地事務執行狀態
@return 回傳事務狀態,COMMIT :提交 ROLLBACK :回滾 UNKNOW :回呼
*/
RocketMQLocalTransactionState checkLocalTransaction(Message msg);
}
- 發送事務訊息 :
以下是RocketMQ提供用于發送事務訊息的API :
TransactionMQProducer producer = new TransactionMQProducer("ProducerGroup");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
// 設定TransactionListener實作
producer.setTransactionListener(transactionListener);
// 發送事務訊息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
5.3. RocketMQ實作可靠訊息最終一致性事務
5.3.1. 業務說明
本實體通過RocketMQ中間件實作可靠訊息最終一致性分布式事務,模擬兩個賬戶的轉賬交易程序,
兩個賬戶在分別在不同的銀行(張三在bank1、李四在bank2),bank1、bank2是兩個微服務,交易程序是,張三給李四轉賬指定金額,
上述交易步驟,張三扣減金額與給bank2發轉賬訊息,兩個操作必須是一個整體性的事務,
5.3.2.程式組成部分
本示例程式組成部分如下: 資料庫:MySQL-5.7.25
包括bank1和bank2兩個資料庫,
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服務框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE 微服務及資料庫的關系 :
dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank1 銀行1,操作張三賬戶, 連接資料庫bank1 dtx/dtx-txmsg-demo/dtx-txmsg-demo-bank2 銀行2,操作李四賬戶,連接資料庫bank2
本示例程式技術架構如下 :
互動流程如下 :
1、Bank1向MQ Server發送轉賬訊息;
2、Bank1執行本地事務,扣減金額;
3、Bank2接收訊息,執行本地事務,添加金額,
5.3.3. 創建資料庫
創建bank1庫,并匯入以下表結構和資料(包含張三賬戶)
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行 卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);
創建bank2庫,并匯入以下表結構和資料(包含李四賬戶)
CREATE DATABASE `bank2` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行
卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的賬戶', '2', NULL, 0);
在bank1、bank2資料庫中新增de_duplication,交易記錄表(去重表),用于交易冪等控制,
DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` (
`tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
5.3.4. 啟動RocketMQ
(1)下載RocketMQ服務器
下載地址 :http://mirrors.tuna.tsinghua.edu.cn/apache/rocketmq/4.5.0/rocketmq-all-4.5.0-bin- release.zip
(2)解壓并啟動
啟動nameserver :
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑]
start [rocketmq服務端解壓路徑]/bin/mqnamesrv.cmd
啟動broker:
set ROCKETMQ_HOME=[rocketmq服務端解壓路徑]
start [rocketmq服務端解壓路徑]/bin/mqbroker.cmd ‐n 127.0.0.1:9876 autoCreateTopicEnable=true
3.3.5 工程概述
(1)父工程maven依賴說明
在父工程中指定來SpringBoot和SpringCloud版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐dependencies</artifactId>
<version>2.1.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring‐cloud‐dependencies</artifactId>
<version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope> </dependency>
在dtx-txmsg-demo父工程中指定來rocketmq-spring-boot-starter的版本
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq‐spring‐boot‐starter</artifactId>
<version>2.0.2</version>
</dependency>
(2)配置rocketMQ
application-local.properties中配置rocketMQ nameServer地址及生產組 :
rocketmq.producer.group = producer_bank2
rocketmq.name‐server = 127.0.0.1:9876
3.3.6 dtx-txmsg-demo-bank1
dtx-txmsg-demo-bank1實作如下功能:
1、張三扣減金額,提交本地事務,
2、向MQ發送轉賬訊息,
2)Dao
@Mapper
@Component
public interface AccountInfoDao {
@Update("update account_info set account_balance=account_balance+#{amount} where account_no=# {accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
@Select("select count(1) from de_duplication where tx_no = #{txNo}") int isExistTx(String txNo);
@Insert("insert into de_duplication values(#{txNo},now());") int addTx(String txNo);
}
3)AccountInfoService
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Resource
private RocketMQTemplate rocketMQTemplate;
@Autowired
private AccountInfoDao accountInfoDao;
/**
* 更新帳號余額‐發送訊息
* producer向MQ Server發送訊息 *
* @param accountChangeEvent */
@Override
public void sendUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
//構建訊息體
JSONObject jsonObject = new JSONObject(); jsonObject.put("accountChange",accountChangeEvent); Message<String> message =
MessageBuilder.withPayload(jsonObject.toJSONString()).build(); TransactionSendResult sendResult =
rocketMQTemplate.sendMessageInTransaction("producer_group_txmsg_bank1", "topic_txmsg", message, null);
log.info("send transcation message body={},result= {}",message.getPayload(),sendResult.getSendStatus());
}
/**
* 更新帳號余額‐本地事務
* producer發送訊息完成后接收到MQ Server的回應即開始執行本地事務 *
* @param accountChangeEvent */
@Transactional
@Override
public void doUpdateAccountBalance(AccountChangeEvent accountChangeEvent) {
log.info("開始更新本地事務,事務號:{}",accountChangeEvent.getTxNo()); accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount() * ‐1);
}
}
//為冪等作準備 accountInfoDao.addTx(accountChangeEvent.getTxNo()); if(accountChangeEvent.getAmount() == 2){
throw new RuntimeException("bank1更新本地事務時拋出例外"); log.info("結束更新本地事務,事務號:{}",accountChangeEvent.getTxNo());
}
}
4)RocketMQLocalTransactionListener
撰寫RocketMQLocalTransactionListener介面實作類,實作執行本地事務和事務回查兩個方法,
@Component
@Slf4j
@RocketMQTransactionListener(txProducerGroup = "producer_group_txmsg_bank1") public class ProducerTxmsgListener implements RocketMQLocalTransactionListener {
@Autowired
AccountInfoService accountInfoService;
@Autowired
AccountInfoDao accountInfoDao;
//訊息發送成功回呼此方法,此方法執行本地事務
@Override
@Transactional
public RocketMQLocalTransactionState executeLocalTransaction(Message message, Object arg) {
//決議訊息內容
try {
String jsonString = new String((byte[]) message.getPayload());
JSONObject jsonObject = JSONObject.parseObject(jsonString); AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"), AccountChangeEvent.class); //扣除金額
accountInfoService.doUpdateAccountBalance(accountChangeEvent);
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("executeLocalTransaction 事務執行失敗",e); e.printStackTrace();
return RocketMQLocalTransactionState.ROLLBACK;
}
}
//此方法檢查事務執行狀態
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message message) {
RocketMQLocalTransactionState state;
final JSONObject jsonObject = JSON.parseObject(new String((byte[]) message.getPayload()));
AccountChangeEvent accountChangeEvent = JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
//事務id
String txNo = accountChangeEvent.getTxNo();
int isexistTx = accountInfoDao.isExistTx(txNo);
log.info("回查事務,事務號: {} 結果: {}", accountChangeEvent.getTxNo(),isexistTx); if(isexistTx>0){
state= RocketMQLocalTransactionState.COMMIT;
}else{
state= RocketMQLocalTransactionState.UNKNOWN;
}
return state;
}
}
4)Controller
@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
@GetMapping(value = "/transfer")
public String transfer(@RequestParam("accountNo")String accountNo,@RequestParam("amount") Double amount){
String tx_no = UUID.randomUUID().toString();
AccountChangeEvent accountChangeEvent = new AccountChangeEvent(accountNo,amount,tx_no);
accountInfoService.sendUpdateAccountBalance(accountChangeEvent);
return "轉賬成功";
}
}
3.3.7 dtx-txmsg-demo-bank2
dtx-txmsg-demo-bank2需要實作如下功能 :
1、監聽MQ,接收訊息,
2、接收到訊息增加賬戶金額,
1)Service
注意為避免訊息重復發送,這里需要實作冪等,
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
/**
* 消費訊息,更新本地事務,添加金額 * @param accountChangeEvent
*/
@Override
@Transactional
public void addAccountInfoBalance(AccountChangeEvent accountChangeEvent) {
log.info("bank2更新本地賬號,賬號:{},金額: {}",accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
//冪等校驗
int existTx = accountInfoDao.isExistTx(accountChangeEvent.getTxNo()); if(existTx<=0){
//執行更新 accountInfoDao.updateAccountBalance(accountChangeEvent.getAccountNo(),accountChangeEvent.getAmount());
}
}
}
//添加事務記錄
accountInfoDao.addTx(accountChangeEvent.getTxNo());
log.info("更新本地事務執行成功,本次事務號: {}", accountChangeEvent.getTxNo());
}else{
log.info("更新本地事務執行失敗,本次事務號: {}", accountChangeEvent.getTxNo());
}
}
}
2)MQ監聽類
@Component
@RocketMQMessageListener(topic = "topic_txmsg",consumerGroup = "consumer_txmsg_group_bank2")
@Slf4j
public class TxmsgConsumer implements RocketMQListener<String> {
@Autowired
AccountInfoService accountInfoService;
@Override
public void onMessage(String s) {
log.info("開始消費訊息:{}",s);
//決議訊息為物件
final JSONObject jsonObject = JSON.parseObject(s); AccountChangeEvent accountChangeEvent =
JSONObject.parseObject(jsonObject.getString("accountChange"),AccountChangeEvent.class);
//呼叫service增加賬號金額 accountChangeEvent.setAccountNo("2"); accountInfoService.addAccountInfoBalance(accountChangeEvent);
}
}
5.3.8 測驗場景
- bank1本地事務失敗,則bank1不發送轉賬訊息,
- bank2接收轉賬訊息失敗,會進行重試發送訊息,
- bank2多次消費同一個訊息,實作冪等,
5.4 小結
可靠訊息最終一致性就是保證訊息從生產方經過訊息中間件傳遞到消費方的一致性,本案例使用了RocketMQ作為訊息中間件,RocketMQ主要解決了兩個功能 :
1、本地事務與訊息發送的原子性問題,
2、事務參與方接收訊息的可靠性,
可靠訊息最終一致性事務適合執行周期長且實時性要求不高的場景,引入訊息機制后,同步的事務操作變為基于訊息執行的異步操作,避免了分布式事務中的同步阻塞操作的影響,并實作了兩個服務的解耦,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/35263.html
標籤:架構設計
上一篇:從程式員到架構師,有捷徑嗎?
下一篇:用Helm3構建多層微服務
