6.分布式事務解決方案之最大努力通知
6.1. 什么是最大努力通知
最大努力通知也是一種解決分布式事務的方案,下邊是一個是充值的例子:
互動流程 :
1、賬戶系統呼叫充值系統介面
2、充值系統完成支付處理向賬戶系統發起充值結果通知
若通知失敗,則充值系統按策略進行重復通知
3、賬戶系統接收到充值結果通知修改充值狀態
4、賬戶系統未接收到通知會主動呼叫充值系統的介面查詢充值結果
通過上邊的例子我們總結最大努力通知方案的目標 :
目標 :發起通知方通過一定的機制最大努力將業務處理結果通知到接收方,
具體包括 :
1、有一定的訊息重復通知機制,
因為接收通知方可能沒有接收到通知,此時要有一定的機制對訊息重復通知,
2、訊息校對機制,
如果盡最大努力也沒有通知到接收方,或者接收方消費訊息后要再次消費,此時可由接收方主動向通知方查詢訊息資訊來滿足需求,
最大努力通知與可靠訊息一致性有什么不同?
1、解決方案思想不同
可靠訊息一致性,發起通知方需要保證將訊息發出去,并且將訊息發到接收通知方,訊息的可靠性關鍵由發起通知方來保證,
最大努力通知,發起通知方盡最大的努力將業務處理結果通知為接收通知方,但是可能訊息接收不到,此時需要接收通知方主動呼叫發起通知方的介面查詢業務處理結果,通知的可靠性關鍵在接收通知方,
2、兩者的業務應用場景不同
可靠訊息一致性關注的是交易程序的事務一致,以異步的方式完成交易,
最大努力通知關注的是交易后的通知事務,即將交易結果可靠的通知出去,
3、技術解決方向不同
可靠訊息一致性要解決訊息從發出到接收的一致性,即訊息發出并且被接收到,
最大努力通知無法保證訊息從發出到接收的一致性,只提供訊息接收的可靠性機制,可靠機制是,最大努力的將訊息通知給接收方,當訊息無法被接收方接收時,由接收方主動查詢消費(業務處理結果),
6.2. 解決方案
通過對最大努力通知的理解,采用MQ的ack機制就可以實作最大努力通知,
方案1 :
本方案是利用MQ的ack機制由MQ向接收通知方發送通知,流程如下 :
1、發起通知方將通知發給MQ,
使用普通訊息機制將通知發給MQ,
注意 :如果訊息沒有發出去可由接收通知方主動請求發起通知方查詢業務執行結果,
2、接收通知方監聽MQ,
3、接收通知方接收訊息,業務處理完成回應ack,
4、接收通知方若沒有回應ack則MQ會重復通知,
MQ會按照間隔1min、5min、10min、30min、1h、2h、5h、10h的方式,逐步拉大通知間隔(如果MQ采用rocketMq,在broker中可進行配置),直到達到通知要求的時間視窗上限,
5、接收通知方可通過訊息校對介面來校對訊息的一致性,
方案2 :
本方案也是利用MQ的ack機制,與方案1不同的是應用程式向接收通知方發送通知,如下圖 :
互動流程如下 :
1、發起通知方將通知發給MQ,
使用可靠訊息一致方案中的事務訊息保證本地事務與訊息的原子性,最終將通知先發給MQ,
2、通知程式監聽MQ,接收MQ的訊息,
方案1中接收通知方直接監聽MQ,方案2中由通知程式監聽MQ,
通知程式若沒有回應ack則MQ會重復通知,
3、通知程式通過互聯網介面協議(如http、webservice)呼叫接收通知方案介面,完成通知,
通知程式呼叫接收通知方案介面成功就表示通知成功,即消費MQ訊息成功,MQ將不再向通知程式投遞通知訊息,
4、接收通知方可通過訊息校對介面來校對訊息的一致性,
方案1和方案2的不同點 :
1、方案1中接收通知方與MQ介面,即接收通知方案監聽MQ,此方案主要應用與內部應用之間的通知,
2、方案2中由通知程式與MQ介面,通知程式監聽MQ,收到MQ的訊息后由通知程式通過互聯網介面協議呼叫接收通知方,此方案主要應用于外部應用之間的通知,例如支付寶、微信的支付結果通知,
6.3.RocketMQ實作最大努力通知型事務
6.3.1.業務說明
本實體通過RocketMq中間件實作最大努力通知型分布式事務,模擬充值程序,
本案例有賬戶系統和充值系統兩個微服務,其中賬戶系統的資料庫是bank1資料庫,其中有張三賬戶,充值系統的資料庫使用bank1_pay資料庫,記錄了賬戶的充值記錄,
業務流程如下圖 :
互動流程如下 :
1、用戶請求充值系統進行充值,
2、充值系統完成充值將充值結果發給MQ,
3、賬戶系統監聽MQ,接收充值結果通知,如果接收不到訊息,MQ會重復發送通知,接收到充值結果通知賬戶系統增加充值金額,
4、賬戶系統也可以主動查詢充值系統的充值結果查詢介面,增加金額,
6.3.2.程式組成部分
本示例程式組成部分如下 :
資料庫:MySQL-5.7.25
包括bank1和bank1_pay兩個資料庫,
JDK:64位 jdk1.8.0_201
rocketmq 服務端:RocketMQ-4.5.0
rocketmq 客戶端:RocketMQ-Spring-Boot-starter.2.0.2-RELEASE 微服務框架:spring-boot-2.1.3、spring-cloud-Greenwich.RELEASE
微服務及資料庫的關系 :
dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-bank1 銀行1,操作張三賬戶, 連接資料庫bank1 dtx/dtx-notifymsg-demo/dtx-notifymsg-demo-pay 銀行2,操作充值記錄,連接資料庫bank1_pay
互動流程如下 :
1、用戶請求充值系統進行充值,
2、充值系統完成充值將充值結果發給MQ,
3、賬戶系統監聽MQ,接收充值結果通知,如果接收不到訊息,MQ會重復發送通知,接收到充值結果通知賬戶系統增加充值金額,
4、賬戶系統也可以主動查詢充值系統的充值結果查詢介面,增加金額,
6.3.3.創建資料庫
創建bank1庫,并匯入以下表結構和資料(包含張三賬戶)
CREATE DATABASE `bank1` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci';
DROP TABLE IF EXISTS `account_info`; CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '戶 主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '銀行卡號',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帳戶密碼',
`account_balance` double NULL DEFAULT NULL COMMENT '帳戶余額', PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
INSERT INTO `account_info` VALUES (2, '張三的賬戶', '1', '', 10000);
DROP TABLE IF EXISTS `de_duplication`; CREATE TABLE `de_duplication` (
`tx_no` varchar(64) COLLATE utf8_bin NOT NULL, `create_time` datetime(0) NULL DEFAULT NULL, PRIMARY KEY (`tx_no`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
創建bank1_pay庫,并匯入以下表結構:
CREATE DATABASE `bank1_pay` CHARACTER SET 'utf8' COLLATE 'utf8_general_ci'; CREATE TABLE `account_pay` (
`id` varchar(64) COLLATE utf8_bin NOT NULL,
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '賬號', `pay_amount` double NULL DEFAULT NULL COMMENT '充值余額',
`result` varchar(20) COLLATE utf8_bin DEFAULT NULL COMMENT '充值結果:success,fail',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT = Dynamic;
6.3.4.啟動RocketMQ
rocketMQ啟動方式與RocketMQ實作可靠訊息最終一致性事務中完全一致
6.3.5.discover-server
discover-server是服務注冊中心,測驗工程將自己注冊至discover-server,
6.3.6.工程概述
(1)父工程maven依賴說明
在dtx父工程中指定了SpringBoot和SpringCloud版本
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring‐boot‐dependencies</artifactId> <version>2.1.3.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring‐cloud‐dependencies</artifactId> <version>Greenwich.RELEASE</version>
<type>pom</type>
<scope>import</scope>
</dependency>
在dtx-notifymsg-demo父工程中指定了rocketmq-spring-boot-starter的版本,
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq‐spring‐boot‐starter</artifactId>
<version>2.0.2</version>
</dependency>
(2)配置rocketMQ
在application-local.properties中配置rocketMQ nameServer地址及生產組 :
rocketmq.producer.group = producer_bank2
rocketmq.name-server = 127.0.0.1:9876
6.3.7 dtx-notifydemo-pay
dtx-notifydemo-pay實作如下功能 :
1、充值介面;
2、充值完成要通知;
3、充值結果查詢介面,
(2)Dao
@Mapper
@Component
public interface AccountPayDao {
@Insert("insert into account_pay(id,account_no,pay_amount,result) values(#{id},# {accountNo},#{payAmount},#{result})")
int insertAccountPay(@Param("id") String id,@Param("accountNo") String accountNo, @Param("payAmount") Double pay_amount,@Param("result") String result);
@Select("select id,account_no accountNo,pay_amount payAmount,result from account_pay where id=#{txNo}")
AccountPay findByIdTxNo(@Param("txNo") String txNo);
}
(3)Service
@Service
@Slf4j
public class AccountPayServiceImpl implements AccountPayService{
@Autowired
RocketMQTemplate rocketMQTemplate;
@Autowired
AccountPayDao accountPayDao;
@Transactional
@Override
public AccountPay insertAccountPay(AccountPay accountPay) {
int result = accountPayDao.insertAccountPay(accountPay.getId(),
accountPay.getAccountNo(), accountPay.getPayAmount(), "success");
if(result>0){ //發送通知
rocketMQTemplate.convertAndSend("topic_notifymsg",accountPay);
return accountPay;
}
return null;
}
@Override
public AccountPay getAccountPay(String txNo) {
AccountPay accountPay = accountPayDao.findByIdTxNo(txNo);
return accountPay;
}
}
(4)Controller
@RestController
public class AccountPayController {
@Autowired
AccountPayService accountPayService;
//充值
@GetMapping(value = "/paydo")
public AccountPay pay(AccountPay accountPay){
//事務號
String txNo = UUID.randomUUID().toString(); accountPay.setId(txNo);
return accountPayService.insertAccountPay(accountPay);
}
//查詢充值結果
@GetMapping(value = "/payresult/{txNo}")
public AccountPay payresult(@PathVariable("txNo") String txNo){
return accountPayService.getAccountPay(txNo);
}
}
6.3.8 dtx-notifydemo-bank1
dtx-notifydemo-bank1實作如下功能 :
1、監聽MQ,接收充值結果,根據充值結果完成賬戶金額修改,
2、主動查詢充值系統,根據充值結果完成賬戶金額修改,
1)Dao
@Mapper
@Component
public interface AccountInfoDao {
//修改賬戶金額
@Update("update account_info set account_balance=account_balance+#{amount} where account_no=#{accountNo}")
int updateAccountBalance(@Param("accountNo") String accountNo, @Param("amount") Double amount);
//查詢冪等記錄,用于冪等控制
@Select("select count(1) from de_duplication where tx_no = #{txNo}")
int isExistTx(String txNo);
//添加事務記錄,用于冪等控制
@Insert("insert into de_duplication values(#{txNo},now());")
int addTx(String txNo);
}
2)AccountInfoService
@Service
@Slf4j
public class AccountInfoServiceImpl implements AccountInfoService {
@Autowired
AccountInfoDao accountInfoDao;
@Autowired
PayClient payClient; /**
* 更新帳號余額,并發送訊息 *
* @param accountChange */
@Transactional
@Override
public void updateAccountBalance(AccountChangeEvent accountChange) {
//冪等校驗
int existTx = accountInfoDao.isExistTx(accountChange.getTxNo()); if(existTx >0){
log.info("已處理訊息:{}", JSONObject.toJSONString(accountChange));
return ; }
//添加事務記錄 accountInfoDao.addTx(accountChange.getTxNo()); //更新賬戶金額
accountInfoDao.updateAccountBalance(accountChange.getAccountNo(),accountChange.getAmount()); }
/**
* 主動查詢充值結果 *
* @param tx_no */
@Override
public AccountPay queryPayResult(String tx_no) {
//主動請求充值系統查詢充值結果
AccountPay accountPay = payClient.queryPayResult(tx_no); //充值結果
String result = accountPay.getResult();
log.info("主動查詢充值結果:{}", JSON.toJSONString(accountPay));
if("success".equals(result)){
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAccountNo(accountPay.getAccountNo());
accountChangeEvent.setAmount(accountPay.getPayAmount());
accountChangeEvent.setTxNo(accountPay.getId());
updateAccountBalance(accountChangeEvent);
}
return accountPay;
}
}
@FeignClient(value = "dtx‐notifymsg‐demo‐pay", fallback = PayFallback.class)
public interface PayClient {
@GetMapping("/pay/payresult/{txNo}")
AccountPay queryPayResult(@PathVariable("txNo") String txNo);
}
@Component
public class PayFallback implements PayClient {
@Override
public AccountPay queryPayResult(String txNo) {
AccountPay accountPay = new AccountPay();
accountPay.setResult("fail");
return accountPay;
}
}
3)監聽MQ
@Component
@Slf4j
@RocketMQMessageListener(topic="topic_notifymsg",consumerGroup="consumer_group_notifymsg_bank1")
public class NotifyMsgListener implements RocketMQListener<AccountPay> {
@Autowired
AccountInfoService accountInfoService;
@Override
public void onMessage(AccountPay accountPay) {
log.info("接收到訊息:{}", JSON.toJSONString(accountPay));
AccountChangeEvent accountChangeEvent = new AccountChangeEvent();
accountChangeEvent.setAmount(accountPay.getPayAmount());
accountChangeEvent.setAccountNo(accountPay.getAccountNo());
accountChangeEvent.setTxNo(accountPay.getId());
accountInfoService.updateAccountBalance(accountChangeEvent);
log.info("處理訊息完成:{}", JSON.toJSONString(accountChangeEvent));
}
}
4)Controller
@RestController
@Slf4j
public class AccountInfoController {
@Autowired
private AccountInfoService accountInfoService;
//主動查詢充值結果
@GetMapping(value = "/payresult/{txNo}")
public AccountPay result(@PathVariable("txNo") String txNo){
AccountPay accountPay = accountInfoService.queryPayResult(txNo);
return accountPay;
}
}
6.3.9 測驗場景
- 充值系統充值成功,賬戶系統主動查詢充值結果,修改賬戶金額,
- 充值系統充值成功,發送訊息,賬戶系統接收訊息,修改賬戶金額,
- 賬戶系統修改賬戶金額冪等測驗,
6.4. 小結
最大努力通知方案是分布式事務中對一致性要求最低的一種,適用于一些最終一致性時間敏感度低的業務;
最大努力通知方案需要實作如下功能 :
1、訊息重復通知機制,
2、訊息校對機制,
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/35284.html
標籤:架構設計
